mr-runner: support multiple SourceOperations by composing and partitioning.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/e562a443
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/e562a443
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/e562a443

Branch: refs/heads/mr-runner
Commit: e562a4432d407759876e147fdeb132518a1c9637
Parents: 40396d7
Author: Pei He <p...@apache.org>
Authored: Tue Aug 8 15:49:04 2017 +0800
Committer: Pei He <p...@apache.org>
Committed: Thu Aug 31 14:13:49 2017 +0800

----------------------------------------------------------------------
 .../mapreduce/translation/BeamInputFormat.java  | 72 +++++++++++++-------
 .../mapreduce/translation/BeamMapper.java       |  3 +-
 .../translation/FileReadOperation.java          |  9 ++-
 .../translation/FileWriteOperation.java         |  8 +--
 .../mapreduce/translation/GraphPlanner.java     | 51 ++++++++++++--
 .../runners/mapreduce/translation/Graphs.java   |  2 +-
 .../mapreduce/translation/JobPrototype.java     | 28 ++------
 .../mapreduce/translation/Operation.java        |  1 +
 .../translation/PartitionOperation.java         | 72 ++++++++++++++++++++
 .../translation/ReadBoundedTranslator.java      |  3 +-
 .../mapreduce/translation/SourceOperation.java  | 24 +++++--
 .../mapreduce/translation/ViewOperation.java    | 59 ----------------
 .../mapreduce/translation/ViewTranslator.java   | 19 ++++--
 13 files changed, 224 insertions(+), 127 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/e562a443/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamInputFormat.java
----------------------------------------------------------------------
diff --git 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamInputFormat.java
 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamInputFormat.java
index 03a88aa..23534de 100644
--- 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamInputFormat.java
+++ 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamInputFormat.java
@@ -34,6 +34,7 @@ import org.apache.beam.sdk.coders.SerializableCoder;
 import org.apache.beam.sdk.io.BoundedSource;
 import org.apache.beam.sdk.util.SerializableUtils;
 import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.TupleTag;
 import org.apache.commons.codec.binary.Base64;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapreduce.InputFormat;
@@ -52,7 +53,7 @@ public class BeamInputFormat<T> extends InputFormat {
 
   private static final long DEFAULT_DESIRED_BUNDLE_SIZE_SIZE_BYTES = 5 * 1000 
* 1000;
 
-  private List<BoundedSource<T>> sources;
+  private List<SourceOperation.TaggedSource> sources;
   private SerializedPipelineOptions options;
 
   public BeamInputFormat() {
@@ -67,30 +68,37 @@ public class BeamInputFormat<T> extends InputFormat {
         || Strings.isNullOrEmpty(serializedPipelineOptions)) {
       return ImmutableList.of();
     }
-    sources = (List<BoundedSource<T>>) 
SerializableUtils.deserializeFromByteArray(
-        Base64.decodeBase64(serializedBoundedSource), "BoundedSource");
+    sources = (List<SourceOperation.TaggedSource>) 
SerializableUtils.deserializeFromByteArray(
+        Base64.decodeBase64(serializedBoundedSource), "TaggedSources");
     options = ((SerializedPipelineOptions) 
SerializableUtils.deserializeFromByteArray(
         Base64.decodeBase64(serializedPipelineOptions), 
"SerializedPipelineOptions"));
 
     try {
 
       return FluentIterable.from(sources)
-          .transformAndConcat(new Function<BoundedSource<T>, 
Iterable<BoundedSource<T>>>() {
+          .transformAndConcat(
+              new Function<SourceOperation.TaggedSource, 
Iterable<SourceOperation.TaggedSource>>() {
+                @Override
+                public Iterable<SourceOperation.TaggedSource> apply(
+                    final SourceOperation.TaggedSource taggedSource) {
+                  try {
+                    return FluentIterable.from(taggedSource.getSource().split(
+                        DEFAULT_DESIRED_BUNDLE_SIZE_SIZE_BYTES, 
options.getPipelineOptions()))
+                        .transform(new Function<BoundedSource<?>, 
SourceOperation.TaggedSource>() {
+                          @Override
+                          public SourceOperation.TaggedSource 
apply(BoundedSource<?> input) {
+                            return SourceOperation.TaggedSource.of(input, 
taggedSource.getTag());
+                          }});
+                  } catch (Exception e) {
+                    Throwables.throwIfUnchecked(e);
+                    throw new RuntimeException(e);
+                  }
+                }
+              })
+          .transform(new Function<SourceOperation.TaggedSource, InputSplit>() {
             @Override
-            public Iterable<BoundedSource<T>> apply(BoundedSource<T> input) {
-              try {
-                return (Iterable<BoundedSource<T>>) input.split(
-                    DEFAULT_DESIRED_BUNDLE_SIZE_SIZE_BYTES, 
options.getPipelineOptions());
-              } catch (Exception e) {
-                Throwables.throwIfUnchecked(e);
-                throw new RuntimeException(e);
-              }
-            }
-          })
-          .transform(new Function<BoundedSource<T>, InputSplit>() {
-            @Override
-            public InputSplit apply(BoundedSource<T> source) {
-              return new BeamInputSplit(source, options);
+            public InputSplit apply(SourceOperation.TaggedSource taggedSource) 
{
+              return new BeamInputSplit(taggedSource.getSource(), options, 
taggedSource.getTag());
             }})
           .toList();
     } catch (Exception e) {
@@ -107,17 +115,23 @@ public class BeamInputFormat<T> extends InputFormat {
   public static class BeamInputSplit<T> extends InputSplit implements Writable 
{
     private BoundedSource<T> boundedSource;
     private SerializedPipelineOptions options;
+    private TupleTag<?> tupleTag;
 
     public BeamInputSplit() {
     }
 
-    public BeamInputSplit(BoundedSource<T> boundedSource, 
SerializedPipelineOptions options) {
+    public BeamInputSplit(
+        BoundedSource<T> boundedSource,
+        SerializedPipelineOptions options,
+        TupleTag<?> tupleTag) {
       this.boundedSource = checkNotNull(boundedSource, "boundedSources");
       this.options = checkNotNull(options, "options");
+      this.tupleTag = checkNotNull(tupleTag, "tupleTag");
     }
 
     public BeamRecordReader<T> createReader() throws IOException {
-      return new 
BeamRecordReader<>(boundedSource.createReader(options.getPipelineOptions()));
+      return new BeamRecordReader<>(
+          boundedSource.createReader(options.getPipelineOptions()), tupleTag);
     }
 
     @Override
@@ -142,6 +156,7 @@ public class BeamInputFormat<T> extends InputFormat {
       ByteArrayOutputStream stream = new ByteArrayOutputStream();
       SerializableCoder.of(BoundedSource.class).encode(boundedSource, stream);
       SerializableCoder.of(SerializedPipelineOptions.class).encode(options, 
stream);
+      SerializableCoder.of(TupleTag.class).encode(tupleTag, stream);
 
       byte[] bytes = stream.toByteArray();
       out.writeInt(bytes.length);
@@ -157,16 +172,19 @@ public class BeamInputFormat<T> extends InputFormat {
       ByteArrayInputStream inStream = new ByteArrayInputStream(bytes);
       boundedSource = 
SerializableCoder.of(BoundedSource.class).decode(inStream);
       options = 
SerializableCoder.of(SerializedPipelineOptions.class).decode(inStream);
+      tupleTag = SerializableCoder.of(TupleTag.class).decode(inStream);
     }
   }
 
   private static class BeamRecordReader<T> extends RecordReader {
 
     private final BoundedSource.BoundedReader<T> reader;
+    private TupleTag<?> tupleTag;
     private boolean started;
 
-    public BeamRecordReader(BoundedSource.BoundedReader<T> reader) {
+    public BeamRecordReader(BoundedSource.BoundedReader<T> reader, TupleTag<?> 
tupleTag) {
       this.reader = checkNotNull(reader, "reader");
+      this.tupleTag = checkNotNull(tupleTag, "tupleTag");
       this.started = false;
     }
 
@@ -187,13 +205,19 @@ public class BeamInputFormat<T> extends InputFormat {
 
     @Override
     public Object getCurrentKey() throws IOException, InterruptedException {
-      return "global";
+      return tupleTag;
     }
 
     @Override
     public Object getCurrentValue() throws IOException, InterruptedException {
-      return WindowedValue.timestampedValueInGlobalWindow(
-          reader.getCurrent(), reader.getCurrentTimestamp());
+      // TODO: this is a hack to handle that reads from materialized 
PCollections
+      // already return WindowedValue.
+      if (reader.getCurrent() instanceof WindowedValue) {
+        return reader.getCurrent();
+      } else {
+        return WindowedValue.timestampedValueInGlobalWindow(
+            reader.getCurrent(), reader.getCurrentTimestamp());
+      }
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/e562a443/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamMapper.java
----------------------------------------------------------------------
diff --git 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamMapper.java
 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamMapper.java
index d3ebb5c..b03236f 100644
--- 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamMapper.java
+++ 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamMapper.java
@@ -22,6 +22,7 @@ import static 
com.google.common.base.Preconditions.checkNotNull;
 import java.io.IOException;
 import org.apache.beam.sdk.util.SerializableUtils;
 import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.KV;
 import org.apache.commons.codec.binary.Base64;
 import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.hadoop.mapreduce.TaskInputOutputContext;
@@ -57,7 +58,7 @@ public class BeamMapper<ValueInT, ValueOutT>
       Mapper<Object, WindowedValue<ValueInT>, Object, 
WindowedValue<ValueOutT>>.Context context)
       throws IOException, InterruptedException {
     LOG.info("key: {} value: {}.", key, value);
-    operation.process(value);
+    operation.process(WindowedValue.valueInGlobalWindow(KV.of(key, value)));
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/e562a443/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FileReadOperation.java
----------------------------------------------------------------------
diff --git 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FileReadOperation.java
 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FileReadOperation.java
index 674e30a..6bd893a 100644
--- 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FileReadOperation.java
+++ 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FileReadOperation.java
@@ -31,6 +31,7 @@ import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.io.BoundedSource;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.sdk.values.WindowingStrategy;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
@@ -45,8 +46,12 @@ import org.apache.hadoop.io.SequenceFile;
  */
 public class FileReadOperation<T> extends SourceOperation<WindowedValue<T>> {
 
-  public FileReadOperation(int producerStageId, String fileName, Coder<T> 
coder) {
-    super(new FileBoundedSource<>(producerStageId, fileName, coder));
+  public FileReadOperation(
+      int producerStageId,
+      String fileName,
+      Coder<T> coder,
+      TupleTag<?> tupleTag) {
+    super(new FileBoundedSource<>(producerStageId, fileName, coder), tupleTag);
   }
 
   private static class FileBoundedSource<T> extends 
BoundedSource<WindowedValue<T>> {

http://git-wip-us.apache.org/repos/asf/beam/blob/e562a443/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FileWriteOperation.java
----------------------------------------------------------------------
diff --git 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FileWriteOperation.java
 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FileWriteOperation.java
index 468856a..af2e134 100644
--- 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FileWriteOperation.java
+++ 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FileWriteOperation.java
@@ -24,7 +24,6 @@ import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.values.WindowingStrategy;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.mapreduce.TaskInputOutputContext;
@@ -39,13 +38,10 @@ public class FileWriteOperation<T> extends Operation<T> {
   private final Coder<WindowedValue<T>> coder;
   private transient MultipleOutputs mos;
 
-  public FileWriteOperation(String fileName, Coder<T> coder) {
+  public FileWriteOperation(String fileName, Coder<WindowedValue<T>> coder) {
     super(0);
     this.fileName = checkNotNull(fileName, "fileName");
-    checkNotNull(coder, "coder");
-    // TODO: should not hard-code windows coder.
-    this.coder = WindowedValue.getFullCoder(
-        coder, WindowingStrategy.globalDefault().getWindowFn().windowCoder());
+    this.coder = checkNotNull(coder, "coder");
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/e562a443/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GraphPlanner.java
----------------------------------------------------------------------
diff --git 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GraphPlanner.java
 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GraphPlanner.java
index 13d215f..7c76823 100644
--- 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GraphPlanner.java
+++ 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GraphPlanner.java
@@ -17,9 +17,20 @@
  */
 package org.apache.beam.runners.mapreduce.translation;
 
+import static com.google.common.base.Preconditions.checkState;
+
+import com.google.common.base.Function;
+import com.google.common.base.Joiner;
+import com.google.common.collect.FluentIterable;
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import java.util.ArrayList;
 import java.util.List;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.WindowingStrategy;
 
 /**
  * Class that optimizes the initial graph to a fused graph.
@@ -39,21 +50,26 @@ public class GraphPlanner {
           continue;
         }
         Graphs.Step producer = fusedStep.getProducer(tag);
-        if (producer.getOperation() instanceof ViewOperation) {
+        if (producer.getOperation() instanceof FileWriteOperation) {
           continue;
         }
         String tagName = tag.getName();
         String fileName = tagName.replaceAll("[^A-Za-z0-9]", "0");
+
+        // TODO: should not hard-code windows coder.
+        WindowedValue.WindowedValueCoder<?> writeValueCoder = 
WindowedValue.getFullCoder(
+            tag.getCoder(), 
WindowingStrategy.globalDefault().getWindowFn().windowCoder());
+
         fusedStep.addStep(
             Graphs.Step.of(
                 tagName + "/Write",
-                new FileWriteOperation(fileName, tag.getCoder())),
+                new FileWriteOperation(fileName, writeValueCoder)),
             ImmutableList.of(tag),
             ImmutableList.<Graphs.Tag>of());
 
         String readStepName = tagName + "/Read";
         Graphs.Tag readOutput = Graphs.Tag.of(
-            readStepName + ".out", new TupleTag<>(), tag.getCoder());
+            readStepName + ".out", tag.getTupleTag(), tag.getCoder());
         for (Graphs.FusedStep consumer : consumers) {
           // Re-direct tag to readOutput.
           List<Graphs.Step> receivers = consumer.getConsumers(tag);
@@ -64,13 +80,40 @@ public class GraphPlanner {
           consumer.addStep(
               Graphs.Step.of(
                   readStepName,
-                  new FileReadOperation(fusedStep.getStageId(), fileName, 
tag.getCoder())),
+                  new FileReadOperation(
+                      fusedStep.getStageId(), fileName, tag.getCoder(), 
tag.getTupleTag())),
               ImmutableList.<Graphs.Tag>of(),
               ImmutableList.of(readOutput));
         }
       }
     }
 
+    // Insert PartitionOperation
+    for (final Graphs.FusedStep fusedStep : fusedGraph.getFusedSteps()) {
+      List<Graphs.Step> readSteps = fusedStep.getStartSteps();
+
+      List<SourceOperation.TaggedSource> sources = new ArrayList<>();
+      List<Graphs.Tag> readOutTags = new ArrayList<>();
+      List<TupleTag<?>> readOutTupleTags = new ArrayList<>();
+      StringBuilder partitionStepName = new StringBuilder();
+      for (Graphs.Step step : readSteps) {
+        checkState(step.getOperation() instanceof SourceOperation);
+        sources.add(((SourceOperation) step.getOperation()).getTaggedSource());
+        Graphs.Tag tag = 
Iterables.getOnlyElement(fusedStep.getOutputTags(step));
+        readOutTags.add(tag);
+        readOutTupleTags.add(tag.getTupleTag());
+        partitionStepName.append(step.getFullName());
+
+        fusedStep.removeStep(step);
+      }
+      if (partitionStepName.length() > 0) {
+        partitionStepName.deleteCharAt(partitionStepName.length() - 1);
+      }
+
+      Graphs.Step partitionStep =
+          Graphs.Step.of(partitionStepName.toString(), new 
PartitionOperation(sources));
+      fusedStep.addStep(partitionStep, ImmutableList.<Graphs.Tag>of(), 
readOutTags);
+    }
     return fusedGraph;
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/e562a443/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Graphs.java
----------------------------------------------------------------------
diff --git 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Graphs.java
 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Graphs.java
index 97b5441..9743d09 100644
--- 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Graphs.java
+++ 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Graphs.java
@@ -81,7 +81,7 @@ public class Graphs {
         Graphs.Step step,
         List<Graphs.Tag> inTags,
         List<Graphs.Tag> outTags) {
-      if (step.getOperation() instanceof ViewOperation) {
+      if (step.getOperation() instanceof FileWriteOperation) {
         return false;
       }
       if (outTags.size() != 1) {

http://git-wip-us.apache.org/repos/asf/beam/blob/e562a443/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/JobPrototype.java
----------------------------------------------------------------------
diff --git 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/JobPrototype.java
 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/JobPrototype.java
index 1016e22..677f3a7 100644
--- 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/JobPrototype.java
+++ 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/JobPrototype.java
@@ -20,8 +20,6 @@ package org.apache.beam.runners.mapreduce.translation;
 import static com.google.common.base.Preconditions.checkNotNull;
 import static com.google.common.base.Preconditions.checkState;
 
-import com.google.common.base.Function;
-import com.google.common.collect.FluentIterable;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
 import java.io.IOException;
@@ -30,7 +28,6 @@ import java.util.Collections;
 import java.util.List;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.KvCoder;
-import org.apache.beam.sdk.io.BoundedSource;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.util.SerializableUtils;
 import org.apache.beam.sdk.util.WindowedValue;
@@ -81,23 +78,14 @@ public class JobPrototype {
         String.format("/tmp/mapreduce/stage-%d", fusedStep.getStageId()));
 
     // Setup BoundedSources in BeamInputFormat.
-    // TODO: support more than one read steps by introducing a composed 
BeamInputFormat
-    // and a partition operation.
-    List<Graphs.Step> readSteps = fusedStep.getStartSteps();
-    ArrayList<BoundedSource<?>> sources = new ArrayList<>();
-    sources.addAll(
-        FluentIterable.from(readSteps)
-            .transform(new Function<Graphs.Step, BoundedSource<?>>() {
-              @Override
-              public BoundedSource<?> apply(Graphs.Step step) {
-                checkState(step.getOperation() instanceof SourceOperation);
-                return ((SourceOperation) step.getOperation()).getSource();
-              }})
-            .toList());
+    Graphs.Step startStep = 
Iterables.getOnlyElement(fusedStep.getStartSteps());
+    checkState(startStep.getOperation() instanceof PartitionOperation);
+    PartitionOperation partitionOperation = (PartitionOperation) 
startStep.getOperation();
 
     conf.set(
         BeamInputFormat.BEAM_SERIALIZED_BOUNDED_SOURCE,
-        
Base64.encodeBase64String(SerializableUtils.serializeToByteArray(sources)));
+        Base64.encodeBase64String(SerializableUtils.serializeToByteArray(
+            new ArrayList<>(partitionOperation.getTaggedSources()))));
     conf.set(
         BeamInputFormat.BEAM_SERIALIZED_PIPELINE_OPTIONS,
         Base64.encodeBase64String(SerializableUtils.serializeToByteArray(
@@ -151,16 +139,14 @@ public class JobPrototype {
     }
 
     // Setup DoFns in BeamMapper.
-    Graphs.Tag readOutputTag = 
Iterables.getOnlyElement(fusedStep.getOutputTags(readSteps.get(0)));
-    Graphs.Step mapperStartStep = 
Iterables.getOnlyElement(fusedStep.getConsumers(readOutputTag));
-    chainOperations(mapperStartStep, fusedStep);
+    chainOperations(startStep, fusedStep);
 
     job.setMapOutputKeyClass(BytesWritable.class);
     job.setMapOutputValueClass(byte[].class);
     conf.set(
         BeamMapper.BEAM_PAR_DO_OPERATION_MAPPER,
         Base64.encodeBase64String(
-            
SerializableUtils.serializeToByteArray(mapperStartStep.getOperation())));
+            SerializableUtils.serializeToByteArray(startStep.getOperation())));
     job.setMapperClass(BeamMapper.class);
     job.setOutputFormatClass(TextOutputFormat.class);
 

http://git-wip-us.apache.org/repos/asf/beam/blob/e562a443/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Operation.java
----------------------------------------------------------------------
diff --git 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Operation.java
 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Operation.java
index 574f152..7504e1c 100644
--- 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Operation.java
+++ 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Operation.java
@@ -76,6 +76,7 @@ public abstract class Operation<T> implements Serializable {
   }
 
   public List<OutputReceiver> getOutputReceivers() {
+    // TODO: avoid allocating objects for each output emit.
     return ImmutableList.copyOf(receivers);
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/e562a443/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/PartitionOperation.java
----------------------------------------------------------------------
diff --git 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/PartitionOperation.java
 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/PartitionOperation.java
new file mode 100644
index 0000000..b8aefd6
--- /dev/null
+++ 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/PartitionOperation.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.mapreduce.translation;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
+
+import com.google.common.base.Function;
+import com.google.common.collect.FluentIterable;
+import java.io.IOException;
+import java.util.List;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.TupleTag;
+
+/**
+ * Operation that partitions input elements based on their {@link TupleTag} 
keys.
+ */
+public class PartitionOperation extends Operation<KV<TupleTag<?>, Object>> {
+
+  private final List<SourceOperation.TaggedSource> sources;
+  private final List<TupleTag<?>> tupleTags;
+
+  public PartitionOperation(List<SourceOperation.TaggedSource> sources) {
+    super(sources.size());
+    this.sources = checkNotNull(sources, "sources");
+    this.tupleTags = FluentIterable.from(sources)
+        .transform(new Function<SourceOperation.TaggedSource, TupleTag<?>>() {
+          @Override
+          public TupleTag<?> apply(SourceOperation.TaggedSource input) {
+            return input.getTag();
+          }})
+        .toList();
+  }
+
+  public List<SourceOperation.TaggedSource> getTaggedSources() {
+    return sources;
+  }
+
+  @Override
+  public void process(WindowedValue<KV<TupleTag<?>, Object>> elem) throws 
IOException,
+      InterruptedException {
+    TupleTag<?> tupleTag = elem.getValue().getKey();
+    int outputIndex = getOutputIndex(tupleTag);
+    OutputReceiver receiver = getOutputReceivers().get(outputIndex);
+    receiver.process((WindowedValue<?>) elem.getValue().getValue());
+  }
+
+  @Override
+  protected int getOutputIndex(TupleTag<?> tupleTag) {
+    int index = tupleTags.indexOf(tupleTag);
+    checkState(
+        index >= 0,
+        String.format("Cannot find index for tuple tag: %s.", tupleTag));
+    return index;
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/e562a443/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ReadBoundedTranslator.java
----------------------------------------------------------------------
diff --git 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ReadBoundedTranslator.java
 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ReadBoundedTranslator.java
index 86ee78a..e93986b 100644
--- 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ReadBoundedTranslator.java
+++ 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ReadBoundedTranslator.java
@@ -27,7 +27,8 @@ class ReadBoundedTranslator<T> extends 
TransformTranslator.Default<Read.Bounded<
   public void translateNode(Read.Bounded transform, TranslationContext 
context) {
     TranslationContext.UserGraphContext userGraphContext = 
context.getUserGraphContext();
 
-    SourceOperation operation = new SourceOperation(transform.getSource());
+    SourceOperation operation =
+        new SourceOperation(transform.getSource(), 
userGraphContext.getOnlyOutputTag());
     context.addInitStep(
         Graphs.Step.of(userGraphContext.getStepName(), operation),
         userGraphContext.getInputTags(),

http://git-wip-us.apache.org/repos/asf/beam/blob/e562a443/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/SourceOperation.java
----------------------------------------------------------------------
diff --git 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/SourceOperation.java
 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/SourceOperation.java
index 2163f34..4ac850f 100644
--- 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/SourceOperation.java
+++ 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/SourceOperation.java
@@ -19,18 +19,23 @@ package org.apache.beam.runners.mapreduce.translation;
 
 import static com.google.common.base.Preconditions.checkNotNull;
 
+import com.google.auto.value.AutoValue;
+import java.io.Serializable;
 import org.apache.beam.sdk.io.BoundedSource;
 import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.TupleTag;
 
 /**
  * A Read.Bounded place holder {@link Operation} during pipeline translation.
  */
 class SourceOperation<T> extends Operation<T> {
-  private final BoundedSource<T> source;
+  private final TaggedSource source;
 
-  SourceOperation(BoundedSource<T> source) {
+  SourceOperation(BoundedSource<?> boundedSource, TupleTag<?> tupleTag) {
     super(1);
-    this.source = checkNotNull(source, "source");
+    checkNotNull(boundedSource, "boundedSource");
+    checkNotNull(tupleTag, "tupleTag");
+    this.source = TaggedSource.of(boundedSource, tupleTag);
   }
 
   @Override
@@ -39,7 +44,18 @@ class SourceOperation<T> extends Operation<T> {
         String.format("%s should not in execution graph.", 
this.getClass().getSimpleName()));
   }
 
-  BoundedSource<?> getSource() {
+  TaggedSource getTaggedSource() {
     return source;
   }
+
+  @AutoValue
+  abstract static class TaggedSource implements Serializable {
+    abstract BoundedSource<?> getSource();
+    abstract TupleTag<?> getTag();
+
+    static TaggedSource of(BoundedSource<?> boundedSource, TupleTag<?> 
tupleTag) {
+      return new org.apache.beam.runners.mapreduce.translation
+          .AutoValue_SourceOperation_TaggedSource(boundedSource, tupleTag);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/e562a443/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ViewOperation.java
----------------------------------------------------------------------
diff --git 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ViewOperation.java
 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ViewOperation.java
deleted file mode 100644
index 093f00e..0000000
--- 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ViewOperation.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.mapreduce.translation;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import com.google.common.base.Throwables;
-import java.io.ByteArrayOutputStream;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.mapreduce.TaskInputOutputContext;
-
-/**
- * {@link Operation} that materializes views.
- */
-public class ViewOperation<T> extends Operation<T> {
-
-  private final Coder<WindowedValue<T>> valueCoder;
-
-  private transient TaskInputOutputContext<Object, Object, Object, Object> 
taskContext;
-
-  public ViewOperation(Coder<WindowedValue<T>> valueCoder) {
-    super(0);
-    this.valueCoder = checkNotNull(valueCoder, "valueCoder");
-  }
-
-  @Override
-  public void start(TaskInputOutputContext<Object, Object, Object, Object> 
taskContext) {
-    this.taskContext = checkNotNull(taskContext, "taskContext");
-  }
-
-  @Override
-  public void process(WindowedValue<T> elem) {
-    try {
-      ByteArrayOutputStream valueStream = new ByteArrayOutputStream();
-      valueCoder.encode(elem, valueStream);
-      taskContext.write(new BytesWritable("view".getBytes()), 
valueStream.toByteArray());
-    } catch (Exception e) {
-      Throwables.throwIfUnchecked(e);
-      throw new RuntimeException(e);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/e562a443/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ViewTranslator.java
----------------------------------------------------------------------
diff --git 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ViewTranslator.java
 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ViewTranslator.java
index d5eac73..dfa18c8 100644
--- 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ViewTranslator.java
+++ 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ViewTranslator.java
@@ -17,11 +17,14 @@
  */
 package org.apache.beam.runners.mapreduce.translation;
 
-import org.apache.beam.sdk.coders.Coder;
+import com.google.common.collect.Iterables;
 import org.apache.beam.sdk.transforms.View;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.WindowingStrategy;
 
 /**
- * Translates a {@link View.CreatePCollectionView} to a {@link ViewOperation}.
+ * Translates a {@link View.CreatePCollectionView} to a {@link 
FileWriteOperation}.
  */
 public class ViewTranslator extends 
TransformTranslator.Default<View.CreatePCollectionView<?, ?>> {
 
@@ -30,8 +33,16 @@ public class ViewTranslator extends 
TransformTranslator.Default<View.CreatePColl
       View.CreatePCollectionView<?, ?> transform, TranslationContext context) {
     TranslationContext.UserGraphContext userGraphContext = 
context.getUserGraphContext();
 
-    ViewOperation<?> operation =
-        new ViewOperation<>((Coder) 
transform.getView().getPCollection().getCoder());
+    PCollection<?> inPCollection = transform.getView().getPCollection();
+    WindowingStrategy<?, ?> windowingStrategy = 
inPCollection.getWindowingStrategy();
+
+    Graphs.Tag outTag = 
Iterables.getOnlyElement(userGraphContext.getOutputTags());
+    String fileName = outTag.getName().replaceAll("[^A-Za-z0-9]", "0");
+
+    FileWriteOperation<?> operation = new FileWriteOperation<>(
+        fileName,
+        WindowedValue.getFullCoder(
+            inPCollection.getCoder(), 
windowingStrategy.getWindowFn().windowCoder()));
     context.addInitStep(
         Graphs.Step.of(userGraphContext.getStepName(), operation),
         userGraphContext.getInputTags(),

Reply via email to