mr-runner: setup file paths for read and write sides of materialization.

Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/b87ae78b
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/b87ae78b
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/b87ae78b

Branch: refs/heads/mr-runner
Commit: b87ae78b5e9c204e03c01b986abf8dc185b6a9ef
Parents: 0ebd14c
Author: Pei He <p...@apache.org>
Authored: Tue Aug 8 22:07:12 2017 +0800
Committer: Pei He <p...@apache.org>
Committed: Thu Aug 31 14:13:49 2017 +0800

----------------------------------------------------------------------
 runners/map-reduce/pom.xml                      |  5 ++
 .../mapreduce/MapReducePipelineOptions.java     |  7 ++-
 .../beam/runners/mapreduce/MapReduceRunner.java |  8 +--
 .../mapreduce/translation/BeamInputFormat.java  | 20 +++----
 .../translation/ConfigurationUtils.java         | 52 +++++++++++++++++
 .../translation/FileReadOperation.java          | 41 ++++++++-----
 .../translation/FileSideInputReader.java        | 41 ++++++-------
 .../mapreduce/translation/GraphPlanner.java     | 55 +++++++++++++-----
 .../GroupAlsoByWindowsParDoOperation.java       |  1 -
 .../mapreduce/translation/JobPrototype.java     | 29 +++++++---
 .../mapreduce/translation/Operation.java        |  6 ++
 .../mapreduce/translation/ParDoOperation.java   | 26 ++++++++-
 .../translation/PartitionOperation.java         | 20 +++----
 .../translation/ReadBoundedTranslator.java      |  6 +-
 .../mapreduce/translation/ReadOperation.java    | 57 ++++++++++++++++++
 .../ReifyTimestampAndWindowsParDoOperation.java |  2 -
 .../translation/SerializableConfiguration.java  | 52 +++++++++++++++++
 .../mapreduce/translation/SourceOperation.java  | 61 --------------------
 .../translation/SourceReadOperation.java        | 42 ++++++++++++++
 .../mapreduce/translation/ViewTranslator.java   |  2 +-
 .../mapreduce/translation/GraphPlannerTest.java |  2 +-
 21 files changed, 370 insertions(+), 165 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/b87ae78b/runners/map-reduce/pom.xml
----------------------------------------------------------------------
diff --git a/runners/map-reduce/pom.xml b/runners/map-reduce/pom.xml
index 06e5227..e858031 100644
--- a/runners/map-reduce/pom.xml
+++ b/runners/map-reduce/pom.xml
@@ -113,6 +113,11 @@
       <artifactId>beam-runners-core-construction-java</artifactId>
     </dependency>
 
+    <dependency>
+      <groupId>org.apache.beam</groupId>
+      <artifactId>beam-sdks-java-io-hadoop-file-system</artifactId>
+    </dependency>
+
     <!-- Module dependencies -->    
     <dependency>
         <groupId>com.google.auto.service</groupId>

http://git-wip-us.apache.org/repos/asf/beam/blob/b87ae78b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReducePipelineOptions.java
----------------------------------------------------------------------
diff --git 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReducePipelineOptions.java
 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReducePipelineOptions.java
index 9224eb6..cfbc006 100644
--- 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReducePipelineOptions.java
+++ 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReducePipelineOptions.java
@@ -43,9 +43,10 @@ public interface MapReducePipelineOptions extends 
PipelineOptions {
   Class<?> getJarClass();
   void setJarClass(Class<?> jarClass);
 
-  @Description("The jar class of the user Beam program.")
-  String getTmpDir();
-  void setTmpDir(String tmpDir);
+  @Description("The directory for files output.")
+  @Default.String("/tmp/mapreduce/")
+  String getFileOutputDir();
+  void setFileOutputDir(String fileOutputDir);
 
   class JarClassInstanceFactory implements DefaultValueFactory<Class<?>> {
     @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/b87ae78b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReduceRunner.java
----------------------------------------------------------------------
diff --git 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReduceRunner.java
 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReduceRunner.java
index a7e75bb..3f76808 100644
--- 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReduceRunner.java
+++ 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReduceRunner.java
@@ -69,18 +69,18 @@ public class MapReduceRunner extends 
PipelineRunner<PipelineResult> {
     Graphs.FusedGraph fusedGraph = new 
Graphs.FusedGraph(context.getInitGraph());
     LOG.info(DotfileWriter.toDotfile(fusedGraph));
 
-    GraphPlanner planner = new GraphPlanner();
+    GraphPlanner planner = new GraphPlanner(options);
     fusedGraph = planner.plan(fusedGraph);
 
     LOG.info(DotfileWriter.toDotfile(fusedGraph));
 
-    Configuration config = new Configuration();
-    config.set("keep.failed.task.files", "true");
-
     fusedGraph.getFusedSteps();
 
     int stageId = 0;
     for (Graphs.FusedStep fusedStep : fusedGraph.getFusedSteps()) {
+      Configuration config = new Configuration();
+      config.set("keep.failed.task.files", "true");
+
       JobPrototype jobPrototype = JobPrototype.create(stageId++, fusedStep, 
options);
       LOG.info("Running job-{}.", stageId);
       LOG.info(DotfileWriter.toDotfile(fusedStep));

http://git-wip-us.apache.org/repos/asf/beam/blob/b87ae78b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamInputFormat.java
----------------------------------------------------------------------
diff --git 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamInputFormat.java
 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamInputFormat.java
index 23534de..10d9ada 100644
--- 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamInputFormat.java
+++ 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamInputFormat.java
@@ -53,7 +53,7 @@ public class BeamInputFormat<T> extends InputFormat {
 
   private static final long DEFAULT_DESIRED_BUNDLE_SIZE_SIZE_BYTES = 5 * 1000 
* 1000;
 
-  private List<SourceOperation.TaggedSource> sources;
+  private List<ReadOperation.TaggedSource> sources;
   private SerializedPipelineOptions options;
 
   public BeamInputFormat() {
@@ -68,7 +68,7 @@ public class BeamInputFormat<T> extends InputFormat {
         || Strings.isNullOrEmpty(serializedPipelineOptions)) {
       return ImmutableList.of();
     }
-    sources = (List<SourceOperation.TaggedSource>) 
SerializableUtils.deserializeFromByteArray(
+    sources = (List<ReadOperation.TaggedSource>) 
SerializableUtils.deserializeFromByteArray(
         Base64.decodeBase64(serializedBoundedSource), "TaggedSources");
     options = ((SerializedPipelineOptions) 
SerializableUtils.deserializeFromByteArray(
         Base64.decodeBase64(serializedPipelineOptions), 
"SerializedPipelineOptions"));
@@ -77,17 +77,17 @@ public class BeamInputFormat<T> extends InputFormat {
 
       return FluentIterable.from(sources)
           .transformAndConcat(
-              new Function<SourceOperation.TaggedSource, 
Iterable<SourceOperation.TaggedSource>>() {
+              new Function<ReadOperation.TaggedSource, 
Iterable<ReadOperation.TaggedSource>>() {
                 @Override
-                public Iterable<SourceOperation.TaggedSource> apply(
-                    final SourceOperation.TaggedSource taggedSource) {
+                public Iterable<ReadOperation.TaggedSource> apply(
+                    final ReadOperation.TaggedSource taggedSource) {
                   try {
                     return FluentIterable.from(taggedSource.getSource().split(
                         DEFAULT_DESIRED_BUNDLE_SIZE_SIZE_BYTES, 
options.getPipelineOptions()))
-                        .transform(new Function<BoundedSource<?>, 
SourceOperation.TaggedSource>() {
+                        .transform(new Function<BoundedSource<?>, 
ReadOperation.TaggedSource>() {
                           @Override
-                          public SourceOperation.TaggedSource 
apply(BoundedSource<?> input) {
-                            return SourceOperation.TaggedSource.of(input, 
taggedSource.getTag());
+                          public ReadOperation.TaggedSource 
apply(BoundedSource<?> input) {
+                            return ReadOperation.TaggedSource.of(input, 
taggedSource.getTag());
                           }});
                   } catch (Exception e) {
                     Throwables.throwIfUnchecked(e);
@@ -95,9 +95,9 @@ public class BeamInputFormat<T> extends InputFormat {
                   }
                 }
               })
-          .transform(new Function<SourceOperation.TaggedSource, InputSplit>() {
+          .transform(new Function<ReadOperation.TaggedSource, InputSplit>() {
             @Override
-            public InputSplit apply(SourceOperation.TaggedSource taggedSource) 
{
+            public InputSplit apply(ReadOperation.TaggedSource taggedSource) {
               return new BeamInputSplit(taggedSource.getSource(), options, 
taggedSource.getTag());
             }})
           .toList();

http://git-wip-us.apache.org/repos/asf/beam/blob/b87ae78b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ConfigurationUtils.java
----------------------------------------------------------------------
diff --git 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ConfigurationUtils.java
 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ConfigurationUtils.java
new file mode 100644
index 0000000..6d7a81a
--- /dev/null
+++ 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ConfigurationUtils.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.mapreduce.translation;
+
+import org.apache.beam.sdk.io.FileSystems;
+import org.apache.beam.sdk.io.fs.ResolveOptions;
+import org.apache.beam.sdk.io.fs.ResourceId;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+
+/**
+ * Utilities to handle {@link Configuration}.
+ */
+public class ConfigurationUtils {
+
+  public static ResourceId getResourceIdForOutput(String fileName, 
Configuration conf) {
+    ResourceId outDir = 
FileSystems.matchNewResource(conf.get(FileOutputFormat.OUTDIR), true);
+    return outDir.resolve(fileName, 
ResolveOptions.StandardResolveOptions.RESOLVE_FILE);
+  }
+
+  public static String getFileOutputDir(String baseFileOutputDir, int stageId) 
{
+    if (baseFileOutputDir.endsWith("/")) {
+      return String.format("%sstage-%d", baseFileOutputDir, stageId);
+    } else {
+      return String.format("%s/stage-%d", baseFileOutputDir, stageId);
+    }
+  }
+
+  public static String getFileOutputPath(String baseFileOutputDir, int 
stageId, String fileName) {
+    return String.format("%s/%s", getFileOutputDir(baseFileOutputDir, 
stageId), fileName);
+  }
+
+  public static String toFileName(String tagName) {
+    return tagName.replaceAll("[^A-Za-z0-9]", "0");
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/b87ae78b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FileReadOperation.java
----------------------------------------------------------------------
diff --git 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FileReadOperation.java
 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FileReadOperation.java
index 70263c3..a95e79e 100644
--- 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FileReadOperation.java
+++ 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FileReadOperation.java
@@ -44,29 +44,41 @@ import org.apache.hadoop.io.SequenceFile;
 /**
  * Operation that reads from files.
  */
-public class FileReadOperation<T> extends SourceOperation<WindowedValue<T>> {
+public class FileReadOperation<T> extends ReadOperation<WindowedValue<T>> {
+
+  private final String fileName;
+  private final Coder<?> coder;
+  private final TupleTag<?> tupleTag;
 
   public FileReadOperation(
-      int producerStageId,
       String fileName,
       Coder<T> coder,
       TupleTag<?> tupleTag) {
-    super(new FileBoundedSource<>(producerStageId, fileName, coder), tupleTag);
+    super();
+    this.fileName = checkNotNull(fileName, "fileName");
+    this.coder = checkNotNull(coder, "coder");
+    this.tupleTag = checkNotNull(tupleTag, "tupleTag");
+  }
+
+  @Override
+  TaggedSource getTaggedSource(Configuration conf) {
+    return TaggedSource.of(
+        new FileBoundedSource<>(fileName, coder, new 
SerializableConfiguration(conf)),
+        tupleTag);
   }
 
   private static class FileBoundedSource<T> extends 
BoundedSource<WindowedValue<T>> {
 
-    private final int producerStageId;
     private final String fileName;
     private final Coder<WindowedValue<T>> coder;
+    private final SerializableConfiguration conf;
 
-    FileBoundedSource(int producerStageId, String fileName, Coder<T> coder) {
-      this.producerStageId = producerStageId;
+    FileBoundedSource(String fileName, Coder<T> coder, 
SerializableConfiguration conf) {
       this.fileName = checkNotNull(fileName, "fileName");
       checkNotNull(coder, "coder");
       this.coder = WindowedValue.getFullCoder(
           coder, 
WindowingStrategy.globalDefault().getWindowFn().windowCoder());
-
+      this.conf = checkNotNull(conf, "conf");
     }
 
     @Override
@@ -84,18 +96,15 @@ public class FileReadOperation<T> extends 
SourceOperation<WindowedValue<T>> {
     @Override
     public BoundedReader<WindowedValue<T>> createReader(PipelineOptions 
options)
         throws IOException {
-      Path pattern = new Path(String.format("/tmp/mapreduce/stage-2/%s*", 
fileName));
-      // TODO: use config from the job.
-      Configuration conf = new Configuration();
-      conf.set(
-          "io.serializations",
-          "org.apache.hadoop.io.serializer.WritableSerialization,"
-              + "org.apache.hadoop.io.serializer.JavaSerialization");
-      FileSystem fs = pattern.getFileSystem(conf);
+      Path pattern = new Path(
+          ConfigurationUtils.getResourceIdForOutput(fileName, conf.getConf()) 
+ "*");
+
+      FileSystem fs = pattern.getFileSystem(conf.getConf());
       FileStatus[] files = fs.globStatus(pattern);
+
       Queue<SequenceFile.Reader> readers = new LinkedList<>();
       for (FileStatus f : files) {
-        readers.add(new SequenceFile.Reader(fs, f.getPath(), conf));
+        readers.add(new SequenceFile.Reader(fs, f.getPath(), conf.getConf()));
       }
       return new Reader<>(this, readers, coder);
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/b87ae78b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FileSideInputReader.java
----------------------------------------------------------------------
diff --git 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FileSideInputReader.java
 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FileSideInputReader.java
index 18bff2a..cb3a8c4 100644
--- 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FileSideInputReader.java
+++ 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FileSideInputReader.java
@@ -17,10 +17,11 @@
  */
 package org.apache.beam.runners.mapreduce.translation;
 
+import static com.google.common.base.Preconditions.checkNotNull;
+
 import com.google.common.base.Predicate;
 import com.google.common.base.Throwables;
 import com.google.common.collect.Iterables;
-import com.google.common.collect.Maps;
 import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import java.util.ArrayList;
@@ -47,22 +48,23 @@ import org.apache.hadoop.io.SequenceFile;
  */
 public class FileSideInputReader implements SideInputReader {
 
-  private final Map<TupleTag<?>, String> tupleTagToFileName;
+  private final Map<TupleTag<?>, String> tupleTagToFilePath;
   private final Map<TupleTag<?>, Coder<?>> tupleTagToCoder;
+  private final Configuration conf;
 
-  public FileSideInputReader(List<Graphs.Tag> sideInputTags) {
-    this.tupleTagToFileName = Maps.newHashMap();
-    this.tupleTagToCoder = Maps.newHashMap();
-    for (Graphs.Tag tag : sideInputTags) {
-      tupleTagToFileName.put(tag.getTupleTag(), toFileName(tag.getName()));
-      tupleTagToCoder.put(tag.getTupleTag(), tag.getCoder());
-    }
+  public FileSideInputReader(
+      Map<TupleTag<?>, String> tupleTagToFilePath,
+      Map<TupleTag<?>, Coder<?>> tupleTagToCoder,
+      Configuration conf) {
+    this.tupleTagToFilePath = checkNotNull(tupleTagToFilePath, 
"tupleTagToFilePath");
+    this.tupleTagToCoder = checkNotNull(tupleTagToCoder, "tupleTagToCoder");
+    this.conf = checkNotNull(conf, "conf");
   }
 
   @Nullable
   @Override
   public <T> T get(PCollectionView<T> view, BoundedWindow window) {
-    String fileName = tupleTagToFileName.get(view.getTagInternal());
+    String filePath = tupleTagToFilePath.get(view.getTagInternal());
     IterableCoder<WindowedValue<?>> coder =
         (IterableCoder<WindowedValue<?>>) 
tupleTagToCoder.get(view.getTagInternal());
     Coder<WindowedValue<?>> elemCoder = coder.getElemCoder();
@@ -70,16 +72,11 @@ public class FileSideInputReader implements SideInputReader 
{
     final BoundedWindow sideInputWindow =
         view.getWindowMappingFn().getSideInputWindow(window);
 
-    Path pattern = new Path(String.format("/tmp/mapreduce/stage-1/%s*", 
fileName));
-    Configuration conf = new Configuration();
-    conf.set(
-        "io.serializations",
-        "org.apache.hadoop.io.serializer.WritableSerialization,"
-            + "org.apache.hadoop.io.serializer.JavaSerialization");
+    Path pattern = new Path(filePath + "*");
     try {
-      FileSystem fs;
-      fs = pattern.getFileSystem(conf);
+      FileSystem fs = pattern.getFileSystem(conf);
       FileStatus[] files = fs.globStatus(pattern);
+      // TODO: handle empty views which may result in no files case.
       SequenceFile.Reader reader = new SequenceFile.Reader(fs, 
files[0].getPath(), conf);
 
       List<WindowedValue<?>> availableSideInputs = new ArrayList<>();
@@ -114,15 +111,11 @@ public class FileSideInputReader implements 
SideInputReader {
 
   @Override
   public <T> boolean contains(PCollectionView<T> view) {
-    return tupleTagToFileName.containsKey(view.getTagInternal());
+    return tupleTagToFilePath.containsKey(view.getTagInternal());
   }
 
   @Override
   public boolean isEmpty() {
-    return tupleTagToFileName.isEmpty();
-  }
-
-  private String toFileName(String tagName) {
-    return tagName.replaceAll("[^A-Za-z0-9]", "0");
+    return tupleTagToFilePath.isEmpty();
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/b87ae78b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GraphPlanner.java
----------------------------------------------------------------------
diff --git 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GraphPlanner.java
 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GraphPlanner.java
index 7c76823..b6e134e 100644
--- 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GraphPlanner.java
+++ 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GraphPlanner.java
@@ -17,17 +17,16 @@
  */
 package org.apache.beam.runners.mapreduce.translation;
 
+import static com.google.common.base.Preconditions.checkNotNull;
 import static com.google.common.base.Preconditions.checkState;
 
-import com.google.common.base.Function;
-import com.google.common.base.Joiner;
-import com.google.common.collect.FluentIterable;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
+import com.google.common.collect.Maps;
 import java.util.ArrayList;
 import java.util.List;
-import javax.annotation.Nullable;
-import org.apache.beam.sdk.coders.Coder;
+import java.util.Map;
+import org.apache.beam.runners.mapreduce.MapReducePipelineOptions;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.sdk.values.WindowingStrategy;
@@ -37,8 +36,10 @@ import org.apache.beam.sdk.values.WindowingStrategy;
  */
 public class GraphPlanner {
 
+  private final MapReducePipelineOptions options;
 
-  public GraphPlanner() {
+  public GraphPlanner(MapReducePipelineOptions options) {
+    this.options = checkNotNull(options, "options");
   }
 
   public Graphs.FusedGraph plan(Graphs.FusedGraph fusedGraph) {
@@ -54,7 +55,7 @@ public class GraphPlanner {
           continue;
         }
         String tagName = tag.getName();
-        String fileName = tagName.replaceAll("[^A-Za-z0-9]", "0");
+        String fileName = ConfigurationUtils.toFileName(tagName);
 
         // TODO: should not hard-code windows coder.
         WindowedValue.WindowedValueCoder<?> writeValueCoder = 
WindowedValue.getFullCoder(
@@ -77,11 +78,13 @@ public class GraphPlanner {
             consumer.addEdge(readOutput, step);
           }
           consumer.removeTag(tag);
+
+          String filePath = ConfigurationUtils.getFileOutputPath(
+              options.getFileOutputDir(), fusedStep.getStageId(), fileName);
           consumer.addStep(
               Graphs.Step.of(
                   readStepName,
-                  new FileReadOperation(
-                      fusedStep.getStageId(), fileName, tag.getCoder(), 
tag.getTupleTag())),
+                  new FileReadOperation(filePath, tag.getCoder(), 
tag.getTupleTag())),
               ImmutableList.<Graphs.Tag>of(),
               ImmutableList.of(readOutput));
         }
@@ -92,13 +95,13 @@ public class GraphPlanner {
     for (final Graphs.FusedStep fusedStep : fusedGraph.getFusedSteps()) {
       List<Graphs.Step> readSteps = fusedStep.getStartSteps();
 
-      List<SourceOperation.TaggedSource> sources = new ArrayList<>();
+      List<ReadOperation> readOperations = new ArrayList<>();
       List<Graphs.Tag> readOutTags = new ArrayList<>();
       List<TupleTag<?>> readOutTupleTags = new ArrayList<>();
       StringBuilder partitionStepName = new StringBuilder();
       for (Graphs.Step step : readSteps) {
-        checkState(step.getOperation() instanceof SourceOperation);
-        sources.add(((SourceOperation) step.getOperation()).getTaggedSource());
+        checkState(step.getOperation() instanceof ReadOperation);
+        readOperations.add(((ReadOperation) step.getOperation()));
         Graphs.Tag tag = 
Iterables.getOnlyElement(fusedStep.getOutputTags(step));
         readOutTags.add(tag);
         readOutTupleTags.add(tag.getTupleTag());
@@ -110,10 +113,34 @@ public class GraphPlanner {
         partitionStepName.deleteCharAt(partitionStepName.length() - 1);
       }
 
-      Graphs.Step partitionStep =
-          Graphs.Step.of(partitionStepName.toString(), new 
PartitionOperation(sources));
+      Graphs.Step partitionStep = Graphs.Step.of(
+          partitionStepName.toString(), new PartitionOperation(readOperations, 
readOutTupleTags));
       fusedStep.addStep(partitionStep, ImmutableList.<Graphs.Tag>of(), 
readOutTags);
     }
+
+    // Setup side inputs
+    for (final Graphs.FusedStep fusedStep : fusedGraph.getFusedSteps()) {
+      for (Graphs.Step step : fusedStep.getSteps()) {
+        if (!(step.getOperation() instanceof ParDoOperation)) {
+          continue;
+        }
+        ParDoOperation parDo = (ParDoOperation) step.getOperation();
+        List<Graphs.Tag> sideInputTags = parDo.getSideInputTags();
+        if (sideInputTags.size() == 0) {
+          continue;
+        }
+        Map<TupleTag<?>, String> tupleTagToFilePath = Maps.newHashMap();
+        for (Graphs.Tag sideInTag : sideInputTags) {
+          tupleTagToFilePath.put(
+              sideInTag.getTupleTag(),
+              ConfigurationUtils.getFileOutputPath(
+                  options.getFileOutputDir(),
+                  fusedGraph.getProducer(sideInTag).getStageId(),
+                  ConfigurationUtils.toFileName(sideInTag.getName())));
+        }
+        parDo.setupSideInput(tupleTagToFilePath);
+      }
+    }
     return fusedGraph;
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/b87ae78b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GroupAlsoByWindowsParDoOperation.java
----------------------------------------------------------------------
diff --git 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GroupAlsoByWindowsParDoOperation.java
 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GroupAlsoByWindowsParDoOperation.java
index 471c7f5..768f17c 100644
--- 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GroupAlsoByWindowsParDoOperation.java
+++ 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GroupAlsoByWindowsParDoOperation.java
@@ -20,7 +20,6 @@ package org.apache.beam.runners.mapreduce.translation;
 import static com.google.common.base.Preconditions.checkNotNull;
 
 import com.google.common.collect.ImmutableList;
-import java.util.List;
 import org.apache.beam.runners.core.SystemReduceFn;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.options.PipelineOptions;

http://git-wip-us.apache.org/repos/asf/beam/blob/b87ae78b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/JobPrototype.java
----------------------------------------------------------------------
diff --git 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/JobPrototype.java
 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/JobPrototype.java
index 677f3a7..9f291d5 100644
--- 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/JobPrototype.java
+++ 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/JobPrototype.java
@@ -20,15 +20,17 @@ package org.apache.beam.runners.mapreduce.translation;
 import static com.google.common.base.Preconditions.checkNotNull;
 import static com.google.common.base.Preconditions.checkState;
 
+import com.google.common.base.Function;
+import com.google.common.collect.FluentIterable;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+import org.apache.beam.runners.mapreduce.MapReducePipelineOptions;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.KvCoder;
-import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.util.SerializableUtils;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.TupleTag;
@@ -49,23 +51,23 @@ import 
org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
 public class JobPrototype {
 
   public static JobPrototype create(
-      int stageId, Graphs.FusedStep fusedStep, PipelineOptions options) {
+      int stageId, Graphs.FusedStep fusedStep, MapReducePipelineOptions 
options) {
     return new JobPrototype(stageId, fusedStep, options);
   }
 
   private final int stageId;
   private final Graphs.FusedStep fusedStep;
-  private final PipelineOptions options;
+  private final MapReducePipelineOptions options;
 
-  private JobPrototype(int stageId, Graphs.FusedStep fusedStep, 
PipelineOptions options) {
+  private JobPrototype(int stageId, Graphs.FusedStep fusedStep, 
MapReducePipelineOptions options) {
     this.stageId = stageId;
     this.fusedStep = checkNotNull(fusedStep, "fusedStep");
     this.options = checkNotNull(options, "options");
   }
 
-  public Job build(Class<?> jarClass, Configuration conf) throws IOException {
-    Job job = new Job(conf);
-    conf = job.getConfiguration();
+  public Job build(Class<?> jarClass, Configuration initConf) throws 
IOException {
+    Job job = new Job(initConf);
+    final Configuration conf = job.getConfiguration();
     job.setJarByClass(jarClass);
     conf.set(
         "io.serializations",
@@ -75,17 +77,26 @@ public class JobPrototype {
     //TODO: config out dir with PipelineOptions.
     conf.set(
         FileOutputFormat.OUTDIR,
-        String.format("/tmp/mapreduce/stage-%d", fusedStep.getStageId()));
+        ConfigurationUtils.getFileOutputDir(options.getFileOutputDir(), 
fusedStep.getStageId()));
 
     // Setup BoundedSources in BeamInputFormat.
     Graphs.Step startStep = 
Iterables.getOnlyElement(fusedStep.getStartSteps());
     checkState(startStep.getOperation() instanceof PartitionOperation);
     PartitionOperation partitionOperation = (PartitionOperation) 
startStep.getOperation();
 
+    ArrayList<ReadOperation.TaggedSource> taggedSources = new ArrayList<>();
+    taggedSources.addAll(FluentIterable.from(partitionOperation
+        .getReadOperations())
+        .transform(new Function<ReadOperation, ReadOperation.TaggedSource>() {
+          @Override
+          public ReadOperation.TaggedSource apply(ReadOperation operation) {
+            return operation.getTaggedSource(conf);
+          }})
+        .toList());
     conf.set(
         BeamInputFormat.BEAM_SERIALIZED_BOUNDED_SOURCE,
         Base64.encodeBase64String(SerializableUtils.serializeToByteArray(
-            new ArrayList<>(partitionOperation.getTaggedSources()))));
+            taggedSources)));
     conf.set(
         BeamInputFormat.BEAM_SERIALIZED_PIPELINE_OPTIONS,
         Base64.encodeBase64String(SerializableUtils.serializeToByteArray(

http://git-wip-us.apache.org/repos/asf/beam/blob/b87ae78b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Operation.java
----------------------------------------------------------------------
diff --git 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Operation.java
 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Operation.java
index 7504e1c..bd24f05 100644
--- 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Operation.java
+++ 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Operation.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.mapreduce.TaskInputOutputContext;
  */
 public abstract class Operation<T> implements Serializable {
   private final OutputReceiver[] receivers;
+  private SerializableConfiguration conf;
 
   public Operation(int numOutputs) {
     this.receivers = new OutputReceiver[numOutputs];
@@ -44,6 +45,7 @@ public abstract class Operation<T> implements Serializable {
    * <p>Called after all successors consuming operations have been started.
    */
   public void start(TaskInputOutputContext<Object, Object, Object, Object> 
taskContext) {
+    conf = new SerializableConfiguration(taskContext.getConfiguration());
     for (OutputReceiver receiver : receivers) {
       if (receiver == null) {
         continue;
@@ -75,6 +77,10 @@ public abstract class Operation<T> implements Serializable {
     }
   }
 
+  public SerializableConfiguration getConf() {
+    return conf;
+  }
+
   public List<OutputReceiver> getOutputReceivers() {
     // TODO: avoid allocating objects for each output emit.
     return ImmutableList.copyOf(receivers);

http://git-wip-us.apache.org/repos/asf/beam/blob/b87ae78b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ParDoOperation.java
----------------------------------------------------------------------
diff --git 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ParDoOperation.java
 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ParDoOperation.java
index 020bfbe..947d773 100644
--- 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ParDoOperation.java
+++ 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ParDoOperation.java
@@ -20,10 +20,14 @@ package org.apache.beam.runners.mapreduce.translation;
 import static com.google.common.base.Preconditions.checkNotNull;
 import static com.google.common.base.Preconditions.checkState;
 
+import com.google.common.collect.Maps;
 import java.util.List;
+import java.util.Map;
 import javax.annotation.Nullable;
 import org.apache.beam.runners.core.DoFnRunner;
 import org.apache.beam.runners.core.DoFnRunners;
+import org.apache.beam.runners.core.NullSideInputReader;
+import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.reflect.DoFnInvoker;
@@ -40,8 +44,10 @@ public abstract class ParDoOperation<InputT, OutputT> 
extends Operation<InputT>
   protected final SerializedPipelineOptions options;
   protected final TupleTag<OutputT> mainOutputTag;
   private final List<TupleTag<?>> sideOutputTags;
-  private final List<Graphs.Tag> sideInputTags;
   protected final WindowingStrategy<?, ?> windowingStrategy;
+  private final List<Graphs.Tag> sideInputTags;
+  private Map<TupleTag<?>, String> tupleTagToFilePath;
+
 
   protected DoFnInvoker<InputT, OutputT> doFnInvoker;
   private DoFnRunner<InputT, OutputT> fnRunner;
@@ -56,8 +62,8 @@ public abstract class ParDoOperation<InputT, OutputT> extends 
Operation<InputT>
     this.options = new SerializedPipelineOptions(checkNotNull(options, 
"options"));
     this.mainOutputTag = checkNotNull(mainOutputTag, "mainOutputTag");
     this.sideOutputTags = checkNotNull(sideOutputTags, "sideOutputTags");
-    this.sideInputTags = checkNotNull(sideInputTags, "sideInputTags");
     this.windowingStrategy = checkNotNull(windowingStrategy, 
"windowingStrategy");
+    this.sideInputTags = checkNotNull(sideInputTags, "sideInputTags");
   }
 
   /**
@@ -73,10 +79,16 @@ public abstract class ParDoOperation<InputT, OutputT> 
extends Operation<InputT>
     doFnInvoker = DoFnInvokers.invokerFor(doFn);
     doFnInvoker.invokeSetup();
 
+    Map<TupleTag<?>, Coder<?>> tupleTagToCoder = Maps.newHashMap();
+    for (Graphs.Tag tag : sideInputTags) {
+      tupleTagToCoder.put(tag.getTupleTag(), tag.getCoder());
+    }
     fnRunner = DoFnRunners.simpleRunner(
         options.getPipelineOptions(),
         getDoFn(),
-        new FileSideInputReader(sideInputTags),
+        sideInputTags.isEmpty()
+            ? NullSideInputReader.empty() :
+            new FileSideInputReader(tupleTagToFilePath, tupleTagToCoder, 
getConf().getConf()),
         createOutputManager(),
         mainOutputTag,
         sideOutputTags,
@@ -100,6 +112,14 @@ public abstract class ParDoOperation<InputT, OutputT> 
extends Operation<InputT>
     super.finish();
   }
 
+  public void setupSideInput(Map<TupleTag<?>, String> tupleTagToFilePath) {
+    this.tupleTagToFilePath = checkNotNull(tupleTagToFilePath, 
"tupleTagToFilePath");
+  }
+
+  public List<Graphs.Tag> getSideInputTags() {
+    return sideInputTags;
+  }
+
   @Override
   protected int getOutputIndex(TupleTag<?> tupleTag) {
     if (tupleTag == mainOutputTag) {

http://git-wip-us.apache.org/repos/asf/beam/blob/b87ae78b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/PartitionOperation.java
----------------------------------------------------------------------
diff --git 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/PartitionOperation.java
 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/PartitionOperation.java
index b8aefd6..687b5b9 100644
--- 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/PartitionOperation.java
+++ 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/PartitionOperation.java
@@ -33,23 +33,17 @@ import org.apache.beam.sdk.values.TupleTag;
  */
 public class PartitionOperation extends Operation<KV<TupleTag<?>, Object>> {
 
-  private final List<SourceOperation.TaggedSource> sources;
+  private final List<ReadOperation> readOperations;
   private final List<TupleTag<?>> tupleTags;
 
-  public PartitionOperation(List<SourceOperation.TaggedSource> sources) {
-    super(sources.size());
-    this.sources = checkNotNull(sources, "sources");
-    this.tupleTags = FluentIterable.from(sources)
-        .transform(new Function<SourceOperation.TaggedSource, TupleTag<?>>() {
-          @Override
-          public TupleTag<?> apply(SourceOperation.TaggedSource input) {
-            return input.getTag();
-          }})
-        .toList();
+  public PartitionOperation(List<ReadOperation> readOperations, 
List<TupleTag<?>> tupleTags) {
+    super(readOperations.size());
+    this.readOperations = checkNotNull(readOperations, "readOperations");
+    this.tupleTags = checkNotNull(tupleTags, "tupleTags");
   }
 
-  public List<SourceOperation.TaggedSource> getTaggedSources() {
-    return sources;
+  public List<ReadOperation> getReadOperations() {
+    return readOperations;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/b87ae78b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ReadBoundedTranslator.java
----------------------------------------------------------------------
diff --git 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ReadBoundedTranslator.java
 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ReadBoundedTranslator.java
index e93986b..138c00e 100644
--- 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ReadBoundedTranslator.java
+++ 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ReadBoundedTranslator.java
@@ -20,15 +20,15 @@ package org.apache.beam.runners.mapreduce.translation;
 import org.apache.beam.sdk.io.Read;
 
 /**
- * Translates a {@link Read.Bounded} to a {@link SourceOperation}.
+ * Translates a {@link Read.Bounded} to a {@link ReadOperation}.
  */
 class ReadBoundedTranslator<T> extends 
TransformTranslator.Default<Read.Bounded<T>> {
   @Override
   public void translateNode(Read.Bounded transform, TranslationContext 
context) {
     TranslationContext.UserGraphContext userGraphContext = 
context.getUserGraphContext();
 
-    SourceOperation operation =
-        new SourceOperation(transform.getSource(), 
userGraphContext.getOnlyOutputTag());
+    ReadOperation operation =
+        new SourceReadOperation(transform.getSource(), 
userGraphContext.getOnlyOutputTag());
     context.addInitStep(
         Graphs.Step.of(userGraphContext.getStepName(), operation),
         userGraphContext.getInputTags(),

http://git-wip-us.apache.org/repos/asf/beam/blob/b87ae78b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ReadOperation.java
----------------------------------------------------------------------
diff --git 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ReadOperation.java
 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ReadOperation.java
new file mode 100644
index 0000000..cb8b00e
--- /dev/null
+++ 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ReadOperation.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.mapreduce.translation;
+
+import com.google.auto.value.AutoValue;
+import java.io.Serializable;
+import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * A Read.Bounded place holder {@link Operation} during pipeline translation.
+ */
+abstract class ReadOperation<T> extends Operation<T> {
+
+  public ReadOperation() {
+    super(1);
+  }
+
+  @Override
+  public void process(WindowedValue elem) {
+    throw new IllegalStateException(
+        String.format("%s should not in execution graph.", 
this.getClass().getSimpleName()));
+  }
+
+  /**
+   * Returns a TaggedSource during pipeline construction time.
+   */
+  abstract TaggedSource getTaggedSource(Configuration conf);
+
+  @AutoValue
+  abstract static class TaggedSource implements Serializable {
+    abstract BoundedSource<?> getSource();
+    abstract TupleTag<?> getTag();
+
+    static TaggedSource of(BoundedSource<?> boundedSource, TupleTag<?> 
tupleTag) {
+      return new org.apache.beam.runners.mapreduce.translation
+          .AutoValue_ReadOperation_TaggedSource(boundedSource, tupleTag);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/b87ae78b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ReifyTimestampAndWindowsParDoOperation.java
----------------------------------------------------------------------
diff --git 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ReifyTimestampAndWindowsParDoOperation.java
 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ReifyTimestampAndWindowsParDoOperation.java
index 459e93b..9a63b05 100644
--- 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ReifyTimestampAndWindowsParDoOperation.java
+++ 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ReifyTimestampAndWindowsParDoOperation.java
@@ -18,8 +18,6 @@
 package org.apache.beam.runners.mapreduce.translation;
 
 import com.google.common.collect.ImmutableList;
-import java.util.Collections;
-import java.util.List;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;

http://git-wip-us.apache.org/repos/asf/beam/blob/b87ae78b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/SerializableConfiguration.java
----------------------------------------------------------------------
diff --git 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/SerializableConfiguration.java
 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/SerializableConfiguration.java
new file mode 100644
index 0000000..7af595c
--- /dev/null
+++ 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/SerializableConfiguration.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.mapreduce.translation;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * A {@link Serializable} {@link Configuration}.
+ */
+class SerializableConfiguration implements Serializable {
+
+  private transient Configuration conf;
+
+  SerializableConfiguration(Configuration conf) {
+    this.conf = checkNotNull(conf, "conf");
+  }
+
+  Configuration getConf() {
+    return conf;
+  }
+
+  private void writeObject(ObjectOutputStream out) throws IOException {
+    out.defaultWriteObject();
+    conf.write(out);
+  }
+
+  private void readObject(ObjectInputStream in) throws IOException {
+    conf = new Configuration();
+    conf.readFields(in);
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/b87ae78b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/SourceOperation.java
----------------------------------------------------------------------
diff --git 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/SourceOperation.java
 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/SourceOperation.java
deleted file mode 100644
index 4ac850f..0000000
--- 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/SourceOperation.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.mapreduce.translation;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import com.google.auto.value.AutoValue;
-import java.io.Serializable;
-import org.apache.beam.sdk.io.BoundedSource;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.values.TupleTag;
-
-/**
- * A Read.Bounded place holder {@link Operation} during pipeline translation.
- */
-class SourceOperation<T> extends Operation<T> {
-  private final TaggedSource source;
-
-  SourceOperation(BoundedSource<?> boundedSource, TupleTag<?> tupleTag) {
-    super(1);
-    checkNotNull(boundedSource, "boundedSource");
-    checkNotNull(tupleTag, "tupleTag");
-    this.source = TaggedSource.of(boundedSource, tupleTag);
-  }
-
-  @Override
-  public void process(WindowedValue elem) {
-    throw new IllegalStateException(
-        String.format("%s should not in execution graph.", 
this.getClass().getSimpleName()));
-  }
-
-  TaggedSource getTaggedSource() {
-    return source;
-  }
-
-  @AutoValue
-  abstract static class TaggedSource implements Serializable {
-    abstract BoundedSource<?> getSource();
-    abstract TupleTag<?> getTag();
-
-    static TaggedSource of(BoundedSource<?> boundedSource, TupleTag<?> 
tupleTag) {
-      return new org.apache.beam.runners.mapreduce.translation
-          .AutoValue_SourceOperation_TaggedSource(boundedSource, tupleTag);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/b87ae78b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/SourceReadOperation.java
----------------------------------------------------------------------
diff --git 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/SourceReadOperation.java
 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/SourceReadOperation.java
new file mode 100644
index 0000000..19b0320
--- /dev/null
+++ 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/SourceReadOperation.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.mapreduce.translation;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * Operation that reads from {@link BoundedSource}.
+ */
+public class SourceReadOperation extends ReadOperation {
+  private final TaggedSource source;
+
+  SourceReadOperation(BoundedSource<?> boundedSource, TupleTag<?> tupleTag) {
+    checkNotNull(boundedSource, "boundedSource");
+    checkNotNull(tupleTag, "tupleTag");
+    this.source = TaggedSource.of(boundedSource, tupleTag);
+  }
+
+  @Override
+  TaggedSource getTaggedSource(Configuration conf) {
+    return source;
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/b87ae78b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ViewTranslator.java
----------------------------------------------------------------------
diff --git 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ViewTranslator.java
 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ViewTranslator.java
index dfa18c8..d018345 100644
--- 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ViewTranslator.java
+++ 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ViewTranslator.java
@@ -37,7 +37,7 @@ public class ViewTranslator extends 
TransformTranslator.Default<View.CreatePColl
     WindowingStrategy<?, ?> windowingStrategy = 
inPCollection.getWindowingStrategy();
 
     Graphs.Tag outTag = 
Iterables.getOnlyElement(userGraphContext.getOutputTags());
-    String fileName = outTag.getName().replaceAll("[^A-Za-z0-9]", "0");
+    String fileName = ConfigurationUtils.toFileName(outTag.getName());
 
     FileWriteOperation<?> operation = new FileWriteOperation<>(
         fileName,

http://git-wip-us.apache.org/repos/asf/beam/blob/b87ae78b/runners/map-reduce/src/test/java/org/apache/beam/runners/mapreduce/translation/GraphPlannerTest.java
----------------------------------------------------------------------
diff --git 
a/runners/map-reduce/src/test/java/org/apache/beam/runners/mapreduce/translation/GraphPlannerTest.java
 
b/runners/map-reduce/src/test/java/org/apache/beam/runners/mapreduce/translation/GraphPlannerTest.java
index ac965cb..fca6131 100644
--- 
a/runners/map-reduce/src/test/java/org/apache/beam/runners/mapreduce/translation/GraphPlannerTest.java
+++ 
b/runners/map-reduce/src/test/java/org/apache/beam/runners/mapreduce/translation/GraphPlannerTest.java
@@ -54,7 +54,7 @@ public class GraphPlannerTest {
     GraphConverter graphConverter = new GraphConverter(context);
     p.traverseTopologically(graphConverter);
 
-    GraphPlanner planner = new GraphPlanner();
+    GraphPlanner planner = new GraphPlanner(options);
     Graphs.FusedGraph fusedGraph = new 
Graphs.FusedGraph(context.getInitGraph());
     fusedGraph = planner.plan(fusedGraph);
 

Reply via email to