mr-runner: setup file paths for read and write sides of materialization.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/b87ae78b Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/b87ae78b Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/b87ae78b Branch: refs/heads/mr-runner Commit: b87ae78b5e9c204e03c01b986abf8dc185b6a9ef Parents: 0ebd14c Author: Pei He <p...@apache.org> Authored: Tue Aug 8 22:07:12 2017 +0800 Committer: Pei He <p...@apache.org> Committed: Thu Aug 31 14:13:49 2017 +0800 ---------------------------------------------------------------------- runners/map-reduce/pom.xml | 5 ++ .../mapreduce/MapReducePipelineOptions.java | 7 ++- .../beam/runners/mapreduce/MapReduceRunner.java | 8 +-- .../mapreduce/translation/BeamInputFormat.java | 20 +++---- .../translation/ConfigurationUtils.java | 52 +++++++++++++++++ .../translation/FileReadOperation.java | 41 ++++++++----- .../translation/FileSideInputReader.java | 41 ++++++------- .../mapreduce/translation/GraphPlanner.java | 55 +++++++++++++----- .../GroupAlsoByWindowsParDoOperation.java | 1 - .../mapreduce/translation/JobPrototype.java | 29 +++++++--- .../mapreduce/translation/Operation.java | 6 ++ .../mapreduce/translation/ParDoOperation.java | 26 ++++++++- .../translation/PartitionOperation.java | 20 +++---- .../translation/ReadBoundedTranslator.java | 6 +- .../mapreduce/translation/ReadOperation.java | 57 ++++++++++++++++++ .../ReifyTimestampAndWindowsParDoOperation.java | 2 - .../translation/SerializableConfiguration.java | 52 +++++++++++++++++ .../mapreduce/translation/SourceOperation.java | 61 -------------------- .../translation/SourceReadOperation.java | 42 ++++++++++++++ .../mapreduce/translation/ViewTranslator.java | 2 +- .../mapreduce/translation/GraphPlannerTest.java | 2 +- 21 files changed, 370 insertions(+), 165 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/b87ae78b/runners/map-reduce/pom.xml ---------------------------------------------------------------------- diff --git a/runners/map-reduce/pom.xml b/runners/map-reduce/pom.xml index 06e5227..e858031 100644 --- a/runners/map-reduce/pom.xml +++ b/runners/map-reduce/pom.xml @@ -113,6 +113,11 @@ <artifactId>beam-runners-core-construction-java</artifactId> </dependency> + <dependency> + <groupId>org.apache.beam</groupId> + <artifactId>beam-sdks-java-io-hadoop-file-system</artifactId> + </dependency> + <!-- Module dependencies --> <dependency> <groupId>com.google.auto.service</groupId> http://git-wip-us.apache.org/repos/asf/beam/blob/b87ae78b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReducePipelineOptions.java ---------------------------------------------------------------------- diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReducePipelineOptions.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReducePipelineOptions.java index 9224eb6..cfbc006 100644 --- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReducePipelineOptions.java +++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReducePipelineOptions.java @@ -43,9 +43,10 @@ public interface MapReducePipelineOptions extends PipelineOptions { Class<?> getJarClass(); void setJarClass(Class<?> jarClass); - @Description("The jar class of the user Beam program.") - String getTmpDir(); - void setTmpDir(String tmpDir); + @Description("The directory for files output.") + @Default.String("/tmp/mapreduce/") + String getFileOutputDir(); + void setFileOutputDir(String fileOutputDir); class JarClassInstanceFactory implements DefaultValueFactory<Class<?>> { @Override http://git-wip-us.apache.org/repos/asf/beam/blob/b87ae78b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReduceRunner.java ---------------------------------------------------------------------- diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReduceRunner.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReduceRunner.java index a7e75bb..3f76808 100644 --- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReduceRunner.java +++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReduceRunner.java @@ -69,18 +69,18 @@ public class MapReduceRunner extends PipelineRunner<PipelineResult> { Graphs.FusedGraph fusedGraph = new Graphs.FusedGraph(context.getInitGraph()); LOG.info(DotfileWriter.toDotfile(fusedGraph)); - GraphPlanner planner = new GraphPlanner(); + GraphPlanner planner = new GraphPlanner(options); fusedGraph = planner.plan(fusedGraph); LOG.info(DotfileWriter.toDotfile(fusedGraph)); - Configuration config = new Configuration(); - config.set("keep.failed.task.files", "true"); - fusedGraph.getFusedSteps(); int stageId = 0; for (Graphs.FusedStep fusedStep : fusedGraph.getFusedSteps()) { + Configuration config = new Configuration(); + config.set("keep.failed.task.files", "true"); + JobPrototype jobPrototype = JobPrototype.create(stageId++, fusedStep, options); LOG.info("Running job-{}.", stageId); LOG.info(DotfileWriter.toDotfile(fusedStep)); http://git-wip-us.apache.org/repos/asf/beam/blob/b87ae78b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamInputFormat.java ---------------------------------------------------------------------- diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamInputFormat.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamInputFormat.java index 23534de..10d9ada 100644 --- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamInputFormat.java +++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamInputFormat.java @@ -53,7 +53,7 @@ public class BeamInputFormat<T> extends InputFormat { private static final long DEFAULT_DESIRED_BUNDLE_SIZE_SIZE_BYTES = 5 * 1000 * 1000; - private List<SourceOperation.TaggedSource> sources; + private List<ReadOperation.TaggedSource> sources; private SerializedPipelineOptions options; public BeamInputFormat() { @@ -68,7 +68,7 @@ public class BeamInputFormat<T> extends InputFormat { || Strings.isNullOrEmpty(serializedPipelineOptions)) { return ImmutableList.of(); } - sources = (List<SourceOperation.TaggedSource>) SerializableUtils.deserializeFromByteArray( + sources = (List<ReadOperation.TaggedSource>) SerializableUtils.deserializeFromByteArray( Base64.decodeBase64(serializedBoundedSource), "TaggedSources"); options = ((SerializedPipelineOptions) SerializableUtils.deserializeFromByteArray( Base64.decodeBase64(serializedPipelineOptions), "SerializedPipelineOptions")); @@ -77,17 +77,17 @@ public class BeamInputFormat<T> extends InputFormat { return FluentIterable.from(sources) .transformAndConcat( - new Function<SourceOperation.TaggedSource, Iterable<SourceOperation.TaggedSource>>() { + new Function<ReadOperation.TaggedSource, Iterable<ReadOperation.TaggedSource>>() { @Override - public Iterable<SourceOperation.TaggedSource> apply( - final SourceOperation.TaggedSource taggedSource) { + public Iterable<ReadOperation.TaggedSource> apply( + final ReadOperation.TaggedSource taggedSource) { try { return FluentIterable.from(taggedSource.getSource().split( DEFAULT_DESIRED_BUNDLE_SIZE_SIZE_BYTES, options.getPipelineOptions())) - .transform(new Function<BoundedSource<?>, SourceOperation.TaggedSource>() { + .transform(new Function<BoundedSource<?>, ReadOperation.TaggedSource>() { @Override - public SourceOperation.TaggedSource apply(BoundedSource<?> input) { - return SourceOperation.TaggedSource.of(input, taggedSource.getTag()); + public ReadOperation.TaggedSource apply(BoundedSource<?> input) { + return ReadOperation.TaggedSource.of(input, taggedSource.getTag()); }}); } catch (Exception e) { Throwables.throwIfUnchecked(e); @@ -95,9 +95,9 @@ public class BeamInputFormat<T> extends InputFormat { } } }) - .transform(new Function<SourceOperation.TaggedSource, InputSplit>() { + .transform(new Function<ReadOperation.TaggedSource, InputSplit>() { @Override - public InputSplit apply(SourceOperation.TaggedSource taggedSource) { + public InputSplit apply(ReadOperation.TaggedSource taggedSource) { return new BeamInputSplit(taggedSource.getSource(), options, taggedSource.getTag()); }}) .toList(); http://git-wip-us.apache.org/repos/asf/beam/blob/b87ae78b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ConfigurationUtils.java ---------------------------------------------------------------------- diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ConfigurationUtils.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ConfigurationUtils.java new file mode 100644 index 0000000..6d7a81a --- /dev/null +++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ConfigurationUtils.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.mapreduce.translation; + +import org.apache.beam.sdk.io.FileSystems; +import org.apache.beam.sdk.io.fs.ResolveOptions; +import org.apache.beam.sdk.io.fs.ResourceId; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; + +/** + * Utilities to handle {@link Configuration}. + */ +public class ConfigurationUtils { + + public static ResourceId getResourceIdForOutput(String fileName, Configuration conf) { + ResourceId outDir = FileSystems.matchNewResource(conf.get(FileOutputFormat.OUTDIR), true); + return outDir.resolve(fileName, ResolveOptions.StandardResolveOptions.RESOLVE_FILE); + } + + public static String getFileOutputDir(String baseFileOutputDir, int stageId) { + if (baseFileOutputDir.endsWith("/")) { + return String.format("%sstage-%d", baseFileOutputDir, stageId); + } else { + return String.format("%s/stage-%d", baseFileOutputDir, stageId); + } + } + + public static String getFileOutputPath(String baseFileOutputDir, int stageId, String fileName) { + return String.format("%s/%s", getFileOutputDir(baseFileOutputDir, stageId), fileName); + } + + public static String toFileName(String tagName) { + return tagName.replaceAll("[^A-Za-z0-9]", "0"); + } + +} http://git-wip-us.apache.org/repos/asf/beam/blob/b87ae78b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FileReadOperation.java ---------------------------------------------------------------------- diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FileReadOperation.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FileReadOperation.java index 70263c3..a95e79e 100644 --- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FileReadOperation.java +++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FileReadOperation.java @@ -44,29 +44,41 @@ import org.apache.hadoop.io.SequenceFile; /** * Operation that reads from files. */ -public class FileReadOperation<T> extends SourceOperation<WindowedValue<T>> { +public class FileReadOperation<T> extends ReadOperation<WindowedValue<T>> { + + private final String fileName; + private final Coder<?> coder; + private final TupleTag<?> tupleTag; public FileReadOperation( - int producerStageId, String fileName, Coder<T> coder, TupleTag<?> tupleTag) { - super(new FileBoundedSource<>(producerStageId, fileName, coder), tupleTag); + super(); + this.fileName = checkNotNull(fileName, "fileName"); + this.coder = checkNotNull(coder, "coder"); + this.tupleTag = checkNotNull(tupleTag, "tupleTag"); + } + + @Override + TaggedSource getTaggedSource(Configuration conf) { + return TaggedSource.of( + new FileBoundedSource<>(fileName, coder, new SerializableConfiguration(conf)), + tupleTag); } private static class FileBoundedSource<T> extends BoundedSource<WindowedValue<T>> { - private final int producerStageId; private final String fileName; private final Coder<WindowedValue<T>> coder; + private final SerializableConfiguration conf; - FileBoundedSource(int producerStageId, String fileName, Coder<T> coder) { - this.producerStageId = producerStageId; + FileBoundedSource(String fileName, Coder<T> coder, SerializableConfiguration conf) { this.fileName = checkNotNull(fileName, "fileName"); checkNotNull(coder, "coder"); this.coder = WindowedValue.getFullCoder( coder, WindowingStrategy.globalDefault().getWindowFn().windowCoder()); - + this.conf = checkNotNull(conf, "conf"); } @Override @@ -84,18 +96,15 @@ public class FileReadOperation<T> extends SourceOperation<WindowedValue<T>> { @Override public BoundedReader<WindowedValue<T>> createReader(PipelineOptions options) throws IOException { - Path pattern = new Path(String.format("/tmp/mapreduce/stage-2/%s*", fileName)); - // TODO: use config from the job. - Configuration conf = new Configuration(); - conf.set( - "io.serializations", - "org.apache.hadoop.io.serializer.WritableSerialization," - + "org.apache.hadoop.io.serializer.JavaSerialization"); - FileSystem fs = pattern.getFileSystem(conf); + Path pattern = new Path( + ConfigurationUtils.getResourceIdForOutput(fileName, conf.getConf()) + "*"); + + FileSystem fs = pattern.getFileSystem(conf.getConf()); FileStatus[] files = fs.globStatus(pattern); + Queue<SequenceFile.Reader> readers = new LinkedList<>(); for (FileStatus f : files) { - readers.add(new SequenceFile.Reader(fs, f.getPath(), conf)); + readers.add(new SequenceFile.Reader(fs, f.getPath(), conf.getConf())); } return new Reader<>(this, readers, coder); } http://git-wip-us.apache.org/repos/asf/beam/blob/b87ae78b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FileSideInputReader.java ---------------------------------------------------------------------- diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FileSideInputReader.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FileSideInputReader.java index 18bff2a..cb3a8c4 100644 --- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FileSideInputReader.java +++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FileSideInputReader.java @@ -17,10 +17,11 @@ */ package org.apache.beam.runners.mapreduce.translation; +import static com.google.common.base.Preconditions.checkNotNull; + import com.google.common.base.Predicate; import com.google.common.base.Throwables; import com.google.common.collect.Iterables; -import com.google.common.collect.Maps; import java.io.ByteArrayInputStream; import java.io.IOException; import java.util.ArrayList; @@ -47,22 +48,23 @@ import org.apache.hadoop.io.SequenceFile; */ public class FileSideInputReader implements SideInputReader { - private final Map<TupleTag<?>, String> tupleTagToFileName; + private final Map<TupleTag<?>, String> tupleTagToFilePath; private final Map<TupleTag<?>, Coder<?>> tupleTagToCoder; + private final Configuration conf; - public FileSideInputReader(List<Graphs.Tag> sideInputTags) { - this.tupleTagToFileName = Maps.newHashMap(); - this.tupleTagToCoder = Maps.newHashMap(); - for (Graphs.Tag tag : sideInputTags) { - tupleTagToFileName.put(tag.getTupleTag(), toFileName(tag.getName())); - tupleTagToCoder.put(tag.getTupleTag(), tag.getCoder()); - } + public FileSideInputReader( + Map<TupleTag<?>, String> tupleTagToFilePath, + Map<TupleTag<?>, Coder<?>> tupleTagToCoder, + Configuration conf) { + this.tupleTagToFilePath = checkNotNull(tupleTagToFilePath, "tupleTagToFilePath"); + this.tupleTagToCoder = checkNotNull(tupleTagToCoder, "tupleTagToCoder"); + this.conf = checkNotNull(conf, "conf"); } @Nullable @Override public <T> T get(PCollectionView<T> view, BoundedWindow window) { - String fileName = tupleTagToFileName.get(view.getTagInternal()); + String filePath = tupleTagToFilePath.get(view.getTagInternal()); IterableCoder<WindowedValue<?>> coder = (IterableCoder<WindowedValue<?>>) tupleTagToCoder.get(view.getTagInternal()); Coder<WindowedValue<?>> elemCoder = coder.getElemCoder(); @@ -70,16 +72,11 @@ public class FileSideInputReader implements SideInputReader { final BoundedWindow sideInputWindow = view.getWindowMappingFn().getSideInputWindow(window); - Path pattern = new Path(String.format("/tmp/mapreduce/stage-1/%s*", fileName)); - Configuration conf = new Configuration(); - conf.set( - "io.serializations", - "org.apache.hadoop.io.serializer.WritableSerialization," - + "org.apache.hadoop.io.serializer.JavaSerialization"); + Path pattern = new Path(filePath + "*"); try { - FileSystem fs; - fs = pattern.getFileSystem(conf); + FileSystem fs = pattern.getFileSystem(conf); FileStatus[] files = fs.globStatus(pattern); + // TODO: handle empty views which may result in no files case. SequenceFile.Reader reader = new SequenceFile.Reader(fs, files[0].getPath(), conf); List<WindowedValue<?>> availableSideInputs = new ArrayList<>(); @@ -114,15 +111,11 @@ public class FileSideInputReader implements SideInputReader { @Override public <T> boolean contains(PCollectionView<T> view) { - return tupleTagToFileName.containsKey(view.getTagInternal()); + return tupleTagToFilePath.containsKey(view.getTagInternal()); } @Override public boolean isEmpty() { - return tupleTagToFileName.isEmpty(); - } - - private String toFileName(String tagName) { - return tagName.replaceAll("[^A-Za-z0-9]", "0"); + return tupleTagToFilePath.isEmpty(); } } http://git-wip-us.apache.org/repos/asf/beam/blob/b87ae78b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GraphPlanner.java ---------------------------------------------------------------------- diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GraphPlanner.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GraphPlanner.java index 7c76823..b6e134e 100644 --- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GraphPlanner.java +++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GraphPlanner.java @@ -17,17 +17,16 @@ */ package org.apache.beam.runners.mapreduce.translation; +import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkState; -import com.google.common.base.Function; -import com.google.common.base.Joiner; -import com.google.common.collect.FluentIterable; import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterables; +import com.google.common.collect.Maps; import java.util.ArrayList; import java.util.List; -import javax.annotation.Nullable; -import org.apache.beam.sdk.coders.Coder; +import java.util.Map; +import org.apache.beam.runners.mapreduce.MapReducePipelineOptions; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.WindowingStrategy; @@ -37,8 +36,10 @@ import org.apache.beam.sdk.values.WindowingStrategy; */ public class GraphPlanner { + private final MapReducePipelineOptions options; - public GraphPlanner() { + public GraphPlanner(MapReducePipelineOptions options) { + this.options = checkNotNull(options, "options"); } public Graphs.FusedGraph plan(Graphs.FusedGraph fusedGraph) { @@ -54,7 +55,7 @@ public class GraphPlanner { continue; } String tagName = tag.getName(); - String fileName = tagName.replaceAll("[^A-Za-z0-9]", "0"); + String fileName = ConfigurationUtils.toFileName(tagName); // TODO: should not hard-code windows coder. WindowedValue.WindowedValueCoder<?> writeValueCoder = WindowedValue.getFullCoder( @@ -77,11 +78,13 @@ public class GraphPlanner { consumer.addEdge(readOutput, step); } consumer.removeTag(tag); + + String filePath = ConfigurationUtils.getFileOutputPath( + options.getFileOutputDir(), fusedStep.getStageId(), fileName); consumer.addStep( Graphs.Step.of( readStepName, - new FileReadOperation( - fusedStep.getStageId(), fileName, tag.getCoder(), tag.getTupleTag())), + new FileReadOperation(filePath, tag.getCoder(), tag.getTupleTag())), ImmutableList.<Graphs.Tag>of(), ImmutableList.of(readOutput)); } @@ -92,13 +95,13 @@ public class GraphPlanner { for (final Graphs.FusedStep fusedStep : fusedGraph.getFusedSteps()) { List<Graphs.Step> readSteps = fusedStep.getStartSteps(); - List<SourceOperation.TaggedSource> sources = new ArrayList<>(); + List<ReadOperation> readOperations = new ArrayList<>(); List<Graphs.Tag> readOutTags = new ArrayList<>(); List<TupleTag<?>> readOutTupleTags = new ArrayList<>(); StringBuilder partitionStepName = new StringBuilder(); for (Graphs.Step step : readSteps) { - checkState(step.getOperation() instanceof SourceOperation); - sources.add(((SourceOperation) step.getOperation()).getTaggedSource()); + checkState(step.getOperation() instanceof ReadOperation); + readOperations.add(((ReadOperation) step.getOperation())); Graphs.Tag tag = Iterables.getOnlyElement(fusedStep.getOutputTags(step)); readOutTags.add(tag); readOutTupleTags.add(tag.getTupleTag()); @@ -110,10 +113,34 @@ public class GraphPlanner { partitionStepName.deleteCharAt(partitionStepName.length() - 1); } - Graphs.Step partitionStep = - Graphs.Step.of(partitionStepName.toString(), new PartitionOperation(sources)); + Graphs.Step partitionStep = Graphs.Step.of( + partitionStepName.toString(), new PartitionOperation(readOperations, readOutTupleTags)); fusedStep.addStep(partitionStep, ImmutableList.<Graphs.Tag>of(), readOutTags); } + + // Setup side inputs + for (final Graphs.FusedStep fusedStep : fusedGraph.getFusedSteps()) { + for (Graphs.Step step : fusedStep.getSteps()) { + if (!(step.getOperation() instanceof ParDoOperation)) { + continue; + } + ParDoOperation parDo = (ParDoOperation) step.getOperation(); + List<Graphs.Tag> sideInputTags = parDo.getSideInputTags(); + if (sideInputTags.size() == 0) { + continue; + } + Map<TupleTag<?>, String> tupleTagToFilePath = Maps.newHashMap(); + for (Graphs.Tag sideInTag : sideInputTags) { + tupleTagToFilePath.put( + sideInTag.getTupleTag(), + ConfigurationUtils.getFileOutputPath( + options.getFileOutputDir(), + fusedGraph.getProducer(sideInTag).getStageId(), + ConfigurationUtils.toFileName(sideInTag.getName()))); + } + parDo.setupSideInput(tupleTagToFilePath); + } + } return fusedGraph; } } http://git-wip-us.apache.org/repos/asf/beam/blob/b87ae78b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GroupAlsoByWindowsParDoOperation.java ---------------------------------------------------------------------- diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GroupAlsoByWindowsParDoOperation.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GroupAlsoByWindowsParDoOperation.java index 471c7f5..768f17c 100644 --- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GroupAlsoByWindowsParDoOperation.java +++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GroupAlsoByWindowsParDoOperation.java @@ -20,7 +20,6 @@ package org.apache.beam.runners.mapreduce.translation; import static com.google.common.base.Preconditions.checkNotNull; import com.google.common.collect.ImmutableList; -import java.util.List; import org.apache.beam.runners.core.SystemReduceFn; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.options.PipelineOptions; http://git-wip-us.apache.org/repos/asf/beam/blob/b87ae78b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/JobPrototype.java ---------------------------------------------------------------------- diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/JobPrototype.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/JobPrototype.java index 677f3a7..9f291d5 100644 --- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/JobPrototype.java +++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/JobPrototype.java @@ -20,15 +20,17 @@ package org.apache.beam.runners.mapreduce.translation; import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkState; +import com.google.common.base.Function; +import com.google.common.collect.FluentIterable; import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterables; import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.List; +import org.apache.beam.runners.mapreduce.MapReducePipelineOptions; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.KvCoder; -import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.util.SerializableUtils; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.TupleTag; @@ -49,23 +51,23 @@ import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; public class JobPrototype { public static JobPrototype create( - int stageId, Graphs.FusedStep fusedStep, PipelineOptions options) { + int stageId, Graphs.FusedStep fusedStep, MapReducePipelineOptions options) { return new JobPrototype(stageId, fusedStep, options); } private final int stageId; private final Graphs.FusedStep fusedStep; - private final PipelineOptions options; + private final MapReducePipelineOptions options; - private JobPrototype(int stageId, Graphs.FusedStep fusedStep, PipelineOptions options) { + private JobPrototype(int stageId, Graphs.FusedStep fusedStep, MapReducePipelineOptions options) { this.stageId = stageId; this.fusedStep = checkNotNull(fusedStep, "fusedStep"); this.options = checkNotNull(options, "options"); } - public Job build(Class<?> jarClass, Configuration conf) throws IOException { - Job job = new Job(conf); - conf = job.getConfiguration(); + public Job build(Class<?> jarClass, Configuration initConf) throws IOException { + Job job = new Job(initConf); + final Configuration conf = job.getConfiguration(); job.setJarByClass(jarClass); conf.set( "io.serializations", @@ -75,17 +77,26 @@ public class JobPrototype { //TODO: config out dir with PipelineOptions. conf.set( FileOutputFormat.OUTDIR, - String.format("/tmp/mapreduce/stage-%d", fusedStep.getStageId())); + ConfigurationUtils.getFileOutputDir(options.getFileOutputDir(), fusedStep.getStageId())); // Setup BoundedSources in BeamInputFormat. Graphs.Step startStep = Iterables.getOnlyElement(fusedStep.getStartSteps()); checkState(startStep.getOperation() instanceof PartitionOperation); PartitionOperation partitionOperation = (PartitionOperation) startStep.getOperation(); + ArrayList<ReadOperation.TaggedSource> taggedSources = new ArrayList<>(); + taggedSources.addAll(FluentIterable.from(partitionOperation + .getReadOperations()) + .transform(new Function<ReadOperation, ReadOperation.TaggedSource>() { + @Override + public ReadOperation.TaggedSource apply(ReadOperation operation) { + return operation.getTaggedSource(conf); + }}) + .toList()); conf.set( BeamInputFormat.BEAM_SERIALIZED_BOUNDED_SOURCE, Base64.encodeBase64String(SerializableUtils.serializeToByteArray( - new ArrayList<>(partitionOperation.getTaggedSources())))); + taggedSources))); conf.set( BeamInputFormat.BEAM_SERIALIZED_PIPELINE_OPTIONS, Base64.encodeBase64String(SerializableUtils.serializeToByteArray( http://git-wip-us.apache.org/repos/asf/beam/blob/b87ae78b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Operation.java ---------------------------------------------------------------------- diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Operation.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Operation.java index 7504e1c..bd24f05 100644 --- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Operation.java +++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Operation.java @@ -30,6 +30,7 @@ import org.apache.hadoop.mapreduce.TaskInputOutputContext; */ public abstract class Operation<T> implements Serializable { private final OutputReceiver[] receivers; + private SerializableConfiguration conf; public Operation(int numOutputs) { this.receivers = new OutputReceiver[numOutputs]; @@ -44,6 +45,7 @@ public abstract class Operation<T> implements Serializable { * <p>Called after all successors consuming operations have been started. */ public void start(TaskInputOutputContext<Object, Object, Object, Object> taskContext) { + conf = new SerializableConfiguration(taskContext.getConfiguration()); for (OutputReceiver receiver : receivers) { if (receiver == null) { continue; @@ -75,6 +77,10 @@ public abstract class Operation<T> implements Serializable { } } + public SerializableConfiguration getConf() { + return conf; + } + public List<OutputReceiver> getOutputReceivers() { // TODO: avoid allocating objects for each output emit. return ImmutableList.copyOf(receivers); http://git-wip-us.apache.org/repos/asf/beam/blob/b87ae78b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ParDoOperation.java ---------------------------------------------------------------------- diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ParDoOperation.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ParDoOperation.java index 020bfbe..947d773 100644 --- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ParDoOperation.java +++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ParDoOperation.java @@ -20,10 +20,14 @@ package org.apache.beam.runners.mapreduce.translation; import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkState; +import com.google.common.collect.Maps; import java.util.List; +import java.util.Map; import javax.annotation.Nullable; import org.apache.beam.runners.core.DoFnRunner; import org.apache.beam.runners.core.DoFnRunners; +import org.apache.beam.runners.core.NullSideInputReader; +import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.reflect.DoFnInvoker; @@ -40,8 +44,10 @@ public abstract class ParDoOperation<InputT, OutputT> extends Operation<InputT> protected final SerializedPipelineOptions options; protected final TupleTag<OutputT> mainOutputTag; private final List<TupleTag<?>> sideOutputTags; - private final List<Graphs.Tag> sideInputTags; protected final WindowingStrategy<?, ?> windowingStrategy; + private final List<Graphs.Tag> sideInputTags; + private Map<TupleTag<?>, String> tupleTagToFilePath; + protected DoFnInvoker<InputT, OutputT> doFnInvoker; private DoFnRunner<InputT, OutputT> fnRunner; @@ -56,8 +62,8 @@ public abstract class ParDoOperation<InputT, OutputT> extends Operation<InputT> this.options = new SerializedPipelineOptions(checkNotNull(options, "options")); this.mainOutputTag = checkNotNull(mainOutputTag, "mainOutputTag"); this.sideOutputTags = checkNotNull(sideOutputTags, "sideOutputTags"); - this.sideInputTags = checkNotNull(sideInputTags, "sideInputTags"); this.windowingStrategy = checkNotNull(windowingStrategy, "windowingStrategy"); + this.sideInputTags = checkNotNull(sideInputTags, "sideInputTags"); } /** @@ -73,10 +79,16 @@ public abstract class ParDoOperation<InputT, OutputT> extends Operation<InputT> doFnInvoker = DoFnInvokers.invokerFor(doFn); doFnInvoker.invokeSetup(); + Map<TupleTag<?>, Coder<?>> tupleTagToCoder = Maps.newHashMap(); + for (Graphs.Tag tag : sideInputTags) { + tupleTagToCoder.put(tag.getTupleTag(), tag.getCoder()); + } fnRunner = DoFnRunners.simpleRunner( options.getPipelineOptions(), getDoFn(), - new FileSideInputReader(sideInputTags), + sideInputTags.isEmpty() + ? NullSideInputReader.empty() : + new FileSideInputReader(tupleTagToFilePath, tupleTagToCoder, getConf().getConf()), createOutputManager(), mainOutputTag, sideOutputTags, @@ -100,6 +112,14 @@ public abstract class ParDoOperation<InputT, OutputT> extends Operation<InputT> super.finish(); } + public void setupSideInput(Map<TupleTag<?>, String> tupleTagToFilePath) { + this.tupleTagToFilePath = checkNotNull(tupleTagToFilePath, "tupleTagToFilePath"); + } + + public List<Graphs.Tag> getSideInputTags() { + return sideInputTags; + } + @Override protected int getOutputIndex(TupleTag<?> tupleTag) { if (tupleTag == mainOutputTag) { http://git-wip-us.apache.org/repos/asf/beam/blob/b87ae78b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/PartitionOperation.java ---------------------------------------------------------------------- diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/PartitionOperation.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/PartitionOperation.java index b8aefd6..687b5b9 100644 --- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/PartitionOperation.java +++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/PartitionOperation.java @@ -33,23 +33,17 @@ import org.apache.beam.sdk.values.TupleTag; */ public class PartitionOperation extends Operation<KV<TupleTag<?>, Object>> { - private final List<SourceOperation.TaggedSource> sources; + private final List<ReadOperation> readOperations; private final List<TupleTag<?>> tupleTags; - public PartitionOperation(List<SourceOperation.TaggedSource> sources) { - super(sources.size()); - this.sources = checkNotNull(sources, "sources"); - this.tupleTags = FluentIterable.from(sources) - .transform(new Function<SourceOperation.TaggedSource, TupleTag<?>>() { - @Override - public TupleTag<?> apply(SourceOperation.TaggedSource input) { - return input.getTag(); - }}) - .toList(); + public PartitionOperation(List<ReadOperation> readOperations, List<TupleTag<?>> tupleTags) { + super(readOperations.size()); + this.readOperations = checkNotNull(readOperations, "readOperations"); + this.tupleTags = checkNotNull(tupleTags, "tupleTags"); } - public List<SourceOperation.TaggedSource> getTaggedSources() { - return sources; + public List<ReadOperation> getReadOperations() { + return readOperations; } @Override http://git-wip-us.apache.org/repos/asf/beam/blob/b87ae78b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ReadBoundedTranslator.java ---------------------------------------------------------------------- diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ReadBoundedTranslator.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ReadBoundedTranslator.java index e93986b..138c00e 100644 --- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ReadBoundedTranslator.java +++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ReadBoundedTranslator.java @@ -20,15 +20,15 @@ package org.apache.beam.runners.mapreduce.translation; import org.apache.beam.sdk.io.Read; /** - * Translates a {@link Read.Bounded} to a {@link SourceOperation}. + * Translates a {@link Read.Bounded} to a {@link ReadOperation}. */ class ReadBoundedTranslator<T> extends TransformTranslator.Default<Read.Bounded<T>> { @Override public void translateNode(Read.Bounded transform, TranslationContext context) { TranslationContext.UserGraphContext userGraphContext = context.getUserGraphContext(); - SourceOperation operation = - new SourceOperation(transform.getSource(), userGraphContext.getOnlyOutputTag()); + ReadOperation operation = + new SourceReadOperation(transform.getSource(), userGraphContext.getOnlyOutputTag()); context.addInitStep( Graphs.Step.of(userGraphContext.getStepName(), operation), userGraphContext.getInputTags(), http://git-wip-us.apache.org/repos/asf/beam/blob/b87ae78b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ReadOperation.java ---------------------------------------------------------------------- diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ReadOperation.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ReadOperation.java new file mode 100644 index 0000000..cb8b00e --- /dev/null +++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ReadOperation.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.mapreduce.translation; + +import com.google.auto.value.AutoValue; +import java.io.Serializable; +import org.apache.beam.sdk.io.BoundedSource; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.hadoop.conf.Configuration; + +/** + * A Read.Bounded place holder {@link Operation} during pipeline translation. + */ +abstract class ReadOperation<T> extends Operation<T> { + + public ReadOperation() { + super(1); + } + + @Override + public void process(WindowedValue elem) { + throw new IllegalStateException( + String.format("%s should not in execution graph.", this.getClass().getSimpleName())); + } + + /** + * Returns a TaggedSource during pipeline construction time. + */ + abstract TaggedSource getTaggedSource(Configuration conf); + + @AutoValue + abstract static class TaggedSource implements Serializable { + abstract BoundedSource<?> getSource(); + abstract TupleTag<?> getTag(); + + static TaggedSource of(BoundedSource<?> boundedSource, TupleTag<?> tupleTag) { + return new org.apache.beam.runners.mapreduce.translation + .AutoValue_ReadOperation_TaggedSource(boundedSource, tupleTag); + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/b87ae78b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ReifyTimestampAndWindowsParDoOperation.java ---------------------------------------------------------------------- diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ReifyTimestampAndWindowsParDoOperation.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ReifyTimestampAndWindowsParDoOperation.java index 459e93b..9a63b05 100644 --- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ReifyTimestampAndWindowsParDoOperation.java +++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ReifyTimestampAndWindowsParDoOperation.java @@ -18,8 +18,6 @@ package org.apache.beam.runners.mapreduce.translation; import com.google.common.collect.ImmutableList; -import java.util.Collections; -import java.util.List; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; http://git-wip-us.apache.org/repos/asf/beam/blob/b87ae78b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/SerializableConfiguration.java ---------------------------------------------------------------------- diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/SerializableConfiguration.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/SerializableConfiguration.java new file mode 100644 index 0000000..7af595c --- /dev/null +++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/SerializableConfiguration.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.mapreduce.translation; + +import static com.google.common.base.Preconditions.checkNotNull; + +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.io.Serializable; +import org.apache.hadoop.conf.Configuration; + +/** + * A {@link Serializable} {@link Configuration}. + */ +class SerializableConfiguration implements Serializable { + + private transient Configuration conf; + + SerializableConfiguration(Configuration conf) { + this.conf = checkNotNull(conf, "conf"); + } + + Configuration getConf() { + return conf; + } + + private void writeObject(ObjectOutputStream out) throws IOException { + out.defaultWriteObject(); + conf.write(out); + } + + private void readObject(ObjectInputStream in) throws IOException { + conf = new Configuration(); + conf.readFields(in); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/b87ae78b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/SourceOperation.java ---------------------------------------------------------------------- diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/SourceOperation.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/SourceOperation.java deleted file mode 100644 index 4ac850f..0000000 --- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/SourceOperation.java +++ /dev/null @@ -1,61 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.mapreduce.translation; - -import static com.google.common.base.Preconditions.checkNotNull; - -import com.google.auto.value.AutoValue; -import java.io.Serializable; -import org.apache.beam.sdk.io.BoundedSource; -import org.apache.beam.sdk.util.WindowedValue; -import org.apache.beam.sdk.values.TupleTag; - -/** - * A Read.Bounded place holder {@link Operation} during pipeline translation. - */ -class SourceOperation<T> extends Operation<T> { - private final TaggedSource source; - - SourceOperation(BoundedSource<?> boundedSource, TupleTag<?> tupleTag) { - super(1); - checkNotNull(boundedSource, "boundedSource"); - checkNotNull(tupleTag, "tupleTag"); - this.source = TaggedSource.of(boundedSource, tupleTag); - } - - @Override - public void process(WindowedValue elem) { - throw new IllegalStateException( - String.format("%s should not in execution graph.", this.getClass().getSimpleName())); - } - - TaggedSource getTaggedSource() { - return source; - } - - @AutoValue - abstract static class TaggedSource implements Serializable { - abstract BoundedSource<?> getSource(); - abstract TupleTag<?> getTag(); - - static TaggedSource of(BoundedSource<?> boundedSource, TupleTag<?> tupleTag) { - return new org.apache.beam.runners.mapreduce.translation - .AutoValue_SourceOperation_TaggedSource(boundedSource, tupleTag); - } - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/b87ae78b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/SourceReadOperation.java ---------------------------------------------------------------------- diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/SourceReadOperation.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/SourceReadOperation.java new file mode 100644 index 0000000..19b0320 --- /dev/null +++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/SourceReadOperation.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.mapreduce.translation; + +import static com.google.common.base.Preconditions.checkNotNull; + +import org.apache.beam.sdk.io.BoundedSource; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.hadoop.conf.Configuration; + +/** + * Operation that reads from {@link BoundedSource}. + */ +public class SourceReadOperation extends ReadOperation { + private final TaggedSource source; + + SourceReadOperation(BoundedSource<?> boundedSource, TupleTag<?> tupleTag) { + checkNotNull(boundedSource, "boundedSource"); + checkNotNull(tupleTag, "tupleTag"); + this.source = TaggedSource.of(boundedSource, tupleTag); + } + + @Override + TaggedSource getTaggedSource(Configuration conf) { + return source; + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/b87ae78b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ViewTranslator.java ---------------------------------------------------------------------- diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ViewTranslator.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ViewTranslator.java index dfa18c8..d018345 100644 --- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ViewTranslator.java +++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ViewTranslator.java @@ -37,7 +37,7 @@ public class ViewTranslator extends TransformTranslator.Default<View.CreatePColl WindowingStrategy<?, ?> windowingStrategy = inPCollection.getWindowingStrategy(); Graphs.Tag outTag = Iterables.getOnlyElement(userGraphContext.getOutputTags()); - String fileName = outTag.getName().replaceAll("[^A-Za-z0-9]", "0"); + String fileName = ConfigurationUtils.toFileName(outTag.getName()); FileWriteOperation<?> operation = new FileWriteOperation<>( fileName, http://git-wip-us.apache.org/repos/asf/beam/blob/b87ae78b/runners/map-reduce/src/test/java/org/apache/beam/runners/mapreduce/translation/GraphPlannerTest.java ---------------------------------------------------------------------- diff --git a/runners/map-reduce/src/test/java/org/apache/beam/runners/mapreduce/translation/GraphPlannerTest.java b/runners/map-reduce/src/test/java/org/apache/beam/runners/mapreduce/translation/GraphPlannerTest.java index ac965cb..fca6131 100644 --- a/runners/map-reduce/src/test/java/org/apache/beam/runners/mapreduce/translation/GraphPlannerTest.java +++ b/runners/map-reduce/src/test/java/org/apache/beam/runners/mapreduce/translation/GraphPlannerTest.java @@ -54,7 +54,7 @@ public class GraphPlannerTest { GraphConverter graphConverter = new GraphConverter(context); p.traverseTopologically(graphConverter); - GraphPlanner planner = new GraphPlanner(); + GraphPlanner planner = new GraphPlanner(options); Graphs.FusedGraph fusedGraph = new Graphs.FusedGraph(context.getInitGraph()); fusedGraph = planner.plan(fusedGraph);