mr-runner: support side inputs by reading in all views contents.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/0ebd14c4 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/0ebd14c4 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/0ebd14c4 Branch: refs/heads/mr-runner Commit: 0ebd14c446421bdb29a95ae231975875b4532031 Parents: e562a44 Author: Pei He <[email protected]> Authored: Tue Aug 8 17:38:58 2017 +0800 Committer: Pei He <[email protected]> Committed: Thu Aug 31 14:13:49 2017 +0800 ---------------------------------------------------------------------- .../translation/FileReadOperation.java | 2 +- .../translation/FileSideInputReader.java | 128 +++++++++++++++++++ .../runners/mapreduce/translation/Graphs.java | 3 +- .../GroupAlsoByWindowsParDoOperation.java | 3 +- .../translation/NormalParDoOperation.java | 3 +- .../mapreduce/translation/ParDoOperation.java | 6 +- .../mapreduce/translation/ParDoTranslator.java | 1 + .../ReifyTimestampAndWindowsParDoOperation.java | 4 +- .../translation/TranslationContext.java | 25 ++++ 9 files changed, 168 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/0ebd14c4/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FileReadOperation.java ---------------------------------------------------------------------- diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FileReadOperation.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FileReadOperation.java index 6bd893a..70263c3 100644 --- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FileReadOperation.java +++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FileReadOperation.java @@ -95,7 +95,7 @@ public class FileReadOperation<T> extends SourceOperation<WindowedValue<T>> { FileStatus[] files = fs.globStatus(pattern); Queue<SequenceFile.Reader> readers = new LinkedList<>(); for (FileStatus f : files) { - readers.add(new SequenceFile.Reader(fs, files[0].getPath(), conf)); + readers.add(new SequenceFile.Reader(fs, f.getPath(), conf)); } return new Reader<>(this, readers, coder); } http://git-wip-us.apache.org/repos/asf/beam/blob/0ebd14c4/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FileSideInputReader.java ---------------------------------------------------------------------- diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FileSideInputReader.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FileSideInputReader.java new file mode 100644 index 0000000..18bff2a --- /dev/null +++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FileSideInputReader.java @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.mapreduce.translation; + +import com.google.common.base.Predicate; +import com.google.common.base.Throwables; +import com.google.common.collect.Iterables; +import com.google.common.collect.Maps; +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import javax.annotation.Nullable; +import org.apache.beam.runners.core.SideInputReader; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.IterableCoder; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.values.PCollectionView; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.SequenceFile; + +/** + * Files based {@link SideInputReader}. + */ +public class FileSideInputReader implements SideInputReader { + + private final Map<TupleTag<?>, String> tupleTagToFileName; + private final Map<TupleTag<?>, Coder<?>> tupleTagToCoder; + + public FileSideInputReader(List<Graphs.Tag> sideInputTags) { + this.tupleTagToFileName = Maps.newHashMap(); + this.tupleTagToCoder = Maps.newHashMap(); + for (Graphs.Tag tag : sideInputTags) { + tupleTagToFileName.put(tag.getTupleTag(), toFileName(tag.getName())); + tupleTagToCoder.put(tag.getTupleTag(), tag.getCoder()); + } + } + + @Nullable + @Override + public <T> T get(PCollectionView<T> view, BoundedWindow window) { + String fileName = tupleTagToFileName.get(view.getTagInternal()); + IterableCoder<WindowedValue<?>> coder = + (IterableCoder<WindowedValue<?>>) tupleTagToCoder.get(view.getTagInternal()); + Coder<WindowedValue<?>> elemCoder = coder.getElemCoder(); + + final BoundedWindow sideInputWindow = + view.getWindowMappingFn().getSideInputWindow(window); + + Path pattern = new Path(String.format("/tmp/mapreduce/stage-1/%s*", fileName)); + Configuration conf = new Configuration(); + conf.set( + "io.serializations", + "org.apache.hadoop.io.serializer.WritableSerialization," + + "org.apache.hadoop.io.serializer.JavaSerialization"); + try { + FileSystem fs; + fs = pattern.getFileSystem(conf); + FileStatus[] files = fs.globStatus(pattern); + SequenceFile.Reader reader = new SequenceFile.Reader(fs, files[0].getPath(), conf); + + List<WindowedValue<?>> availableSideInputs = new ArrayList<>(); + BytesWritable value = new BytesWritable(); + while (reader.next(NullWritable.get(), value)) { + ByteArrayInputStream inStream = new ByteArrayInputStream(value.getBytes()); + availableSideInputs.add(elemCoder.decode(inStream)); + } + Iterable<WindowedValue<?>> sideInputForWindow = + Iterables.filter(availableSideInputs, new Predicate<WindowedValue<?>>() { + @Override + public boolean apply(@Nullable WindowedValue<?> sideInputCandidate) { + if (sideInputCandidate == null) { + return false; + } + // first match of a sideInputWindow to the elementWindow is good enough. + for (BoundedWindow sideInputCandidateWindow: sideInputCandidate.getWindows()) { + if (sideInputCandidateWindow.equals(sideInputWindow)) { + return true; + } + } + // no match found. + return false; + } + }); + return view.getViewFn().apply(sideInputForWindow); + } catch (IOException e) { + Throwables.throwIfUnchecked(e); + throw new RuntimeException(e); + } + } + + @Override + public <T> boolean contains(PCollectionView<T> view) { + return tupleTagToFileName.containsKey(view.getTagInternal()); + } + + @Override + public boolean isEmpty() { + return tupleTagToFileName.isEmpty(); + } + + private String toFileName(String tagName) { + return tagName.replaceAll("[^A-Za-z0-9]", "0"); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/0ebd14c4/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Graphs.java ---------------------------------------------------------------------- diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Graphs.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Graphs.java index 9743d09..b2f793a 100644 --- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Graphs.java +++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Graphs.java @@ -20,6 +20,7 @@ package org.apache.beam.runners.mapreduce.translation; import com.google.auto.value.AutoValue; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; +import java.io.Serializable; import java.util.List; import javax.annotation.Nullable; import org.apache.beam.sdk.coders.Coder; @@ -229,7 +230,7 @@ public class Graphs { } @AutoValue - public abstract static class Tag extends Graph.AbstractTag { + public abstract static class Tag extends Graph.AbstractTag implements Serializable { abstract String getName(); abstract TupleTag<?> getTupleTag(); abstract Coder<?> getCoder(); http://git-wip-us.apache.org/repos/asf/beam/blob/0ebd14c4/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GroupAlsoByWindowsParDoOperation.java ---------------------------------------------------------------------- diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GroupAlsoByWindowsParDoOperation.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GroupAlsoByWindowsParDoOperation.java index 1ae38da..471c7f5 100644 --- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GroupAlsoByWindowsParDoOperation.java +++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GroupAlsoByWindowsParDoOperation.java @@ -40,7 +40,8 @@ public class GroupAlsoByWindowsParDoOperation extends ParDoOperation { WindowingStrategy<?, ?> windowingStrategy, Coder<?> inputCoder, Graphs.Tag outTag) { - super(options, outTag.getTupleTag(), ImmutableList.<TupleTag<?>>of(), windowingStrategy); + super(options, outTag.getTupleTag(), ImmutableList.<TupleTag<?>>of(), + ImmutableList.<Graphs.Tag>of(), windowingStrategy); this.inputCoder = checkNotNull(inputCoder, "inputCoder"); } http://git-wip-us.apache.org/repos/asf/beam/blob/0ebd14c4/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/NormalParDoOperation.java ---------------------------------------------------------------------- diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/NormalParDoOperation.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/NormalParDoOperation.java index fd1b528..58a7d6d 100644 --- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/NormalParDoOperation.java +++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/NormalParDoOperation.java @@ -37,8 +37,9 @@ public class NormalParDoOperation<InputT, OutputT> extends ParDoOperation<InputT PipelineOptions options, TupleTag<OutputT> mainOutputTag, List<TupleTag<?>> sideOutputTags, + List<Graphs.Tag> sideInputTags, WindowingStrategy<?, ?> windowingStrategy) { - super(options, mainOutputTag, sideOutputTags, windowingStrategy); + super(options, mainOutputTag, sideOutputTags, sideInputTags, windowingStrategy); this.doFn = checkNotNull(doFn, "doFn"); } http://git-wip-us.apache.org/repos/asf/beam/blob/0ebd14c4/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ParDoOperation.java ---------------------------------------------------------------------- diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ParDoOperation.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ParDoOperation.java index c6bf49c..020bfbe 100644 --- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ParDoOperation.java +++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ParDoOperation.java @@ -24,7 +24,6 @@ import java.util.List; import javax.annotation.Nullable; import org.apache.beam.runners.core.DoFnRunner; import org.apache.beam.runners.core.DoFnRunners; -import org.apache.beam.runners.core.NullSideInputReader; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.reflect.DoFnInvoker; @@ -41,6 +40,7 @@ public abstract class ParDoOperation<InputT, OutputT> extends Operation<InputT> protected final SerializedPipelineOptions options; protected final TupleTag<OutputT> mainOutputTag; private final List<TupleTag<?>> sideOutputTags; + private final List<Graphs.Tag> sideInputTags; protected final WindowingStrategy<?, ?> windowingStrategy; protected DoFnInvoker<InputT, OutputT> doFnInvoker; @@ -50,11 +50,13 @@ public abstract class ParDoOperation<InputT, OutputT> extends Operation<InputT> PipelineOptions options, TupleTag<OutputT> mainOutputTag, List<TupleTag<?>> sideOutputTags, + List<Graphs.Tag> sideInputTags, WindowingStrategy<?, ?> windowingStrategy) { super(1 + sideOutputTags.size()); this.options = new SerializedPipelineOptions(checkNotNull(options, "options")); this.mainOutputTag = checkNotNull(mainOutputTag, "mainOutputTag"); this.sideOutputTags = checkNotNull(sideOutputTags, "sideOutputTags"); + this.sideInputTags = checkNotNull(sideInputTags, "sideInputTags"); this.windowingStrategy = checkNotNull(windowingStrategy, "windowingStrategy"); } @@ -74,7 +76,7 @@ public abstract class ParDoOperation<InputT, OutputT> extends Operation<InputT> fnRunner = DoFnRunners.simpleRunner( options.getPipelineOptions(), getDoFn(), - NullSideInputReader.empty(), + new FileSideInputReader(sideInputTags), createOutputManager(), mainOutputTag, sideOutputTags, http://git-wip-us.apache.org/repos/asf/beam/blob/0ebd14c4/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ParDoTranslator.java ---------------------------------------------------------------------- diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ParDoTranslator.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ParDoTranslator.java index 9bd89fd..ae23f71 100644 --- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ParDoTranslator.java +++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ParDoTranslator.java @@ -35,6 +35,7 @@ class ParDoTranslator<InputT, OutputT> userGraphContext.getOptions(), transform.getMainOutputTag(), transform.getAdditionalOutputTags().getAll(), + userGraphContext.getSideInputTags(), ((PCollection) userGraphContext.getInput()).getWindowingStrategy()); context.addInitStep( Graphs.Step.of(userGraphContext.getStepName(), operation), http://git-wip-us.apache.org/repos/asf/beam/blob/0ebd14c4/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ReifyTimestampAndWindowsParDoOperation.java ---------------------------------------------------------------------- diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ReifyTimestampAndWindowsParDoOperation.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ReifyTimestampAndWindowsParDoOperation.java index 251828e..459e93b 100644 --- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ReifyTimestampAndWindowsParDoOperation.java +++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ReifyTimestampAndWindowsParDoOperation.java @@ -18,6 +18,7 @@ package org.apache.beam.runners.mapreduce.translation; import com.google.common.collect.ImmutableList; +import java.util.Collections; import java.util.List; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.DoFn; @@ -36,7 +37,8 @@ public class ReifyTimestampAndWindowsParDoOperation extends ParDoOperation { PipelineOptions options, WindowingStrategy<?, ?> windowingStrategy, Graphs.Tag outTag) { - super(options, outTag.getTupleTag(), ImmutableList.<TupleTag<?>>of(), windowingStrategy); + super(options, outTag.getTupleTag(), ImmutableList.<TupleTag<?>>of(), + ImmutableList.<Graphs.Tag>of(), windowingStrategy); } @Override http://git-wip-us.apache.org/repos/asf/beam/blob/0ebd14c4/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/TranslationContext.java ---------------------------------------------------------------------- diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/TranslationContext.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/TranslationContext.java index da8ebff..fd6c0ba 100644 --- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/TranslationContext.java +++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/TranslationContext.java @@ -136,6 +136,31 @@ public class TranslationContext { .toList(); } + public List<Graphs.Tag> getSideInputTags() { + if (!(currentNode.getTransform() instanceof ParDo.MultiOutput)) { + return ImmutableList.of(); + } + return FluentIterable.from(((ParDo.MultiOutput) currentNode.getTransform()).getSideInputs()) + .transform(new Function<PValue, Graphs.Tag>() { + @Override + public Graphs.Tag apply(PValue pValue) { + checkState( + pValueToTupleTag.containsKey(pValue), + String.format("Failed to find TupleTag for pValue: %s.", pValue)); + if (pValue instanceof PCollection) { + PCollection<?> pc = (PCollection<?>) pValue; + return Graphs.Tag.of( + pc.getName(), pValueToTupleTag.get(pValue), pc.getCoder()); + } else { + return Graphs.Tag.of( + pValue.getName(), + pValueToTupleTag.get(pValue), + ((PCollectionView) pValue).getCoderInternal()); + } + }}) + .toList(); + } + public List<Graphs.Tag> getOutputTags() { if (currentNode.getTransform() instanceof View.CreatePCollectionView) { PCollectionView view = ((View.CreatePCollectionView) currentNode.getTransform()).getView();
