mr-runner: support multiple SourceOperations by composing and partitioning.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/e562a443 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/e562a443 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/e562a443 Branch: refs/heads/mr-runner Commit: e562a4432d407759876e147fdeb132518a1c9637 Parents: 40396d7 Author: Pei He <p...@apache.org> Authored: Tue Aug 8 15:49:04 2017 +0800 Committer: Pei He <p...@apache.org> Committed: Thu Aug 31 14:13:49 2017 +0800 ---------------------------------------------------------------------- .../mapreduce/translation/BeamInputFormat.java | 72 +++++++++++++------- .../mapreduce/translation/BeamMapper.java | 3 +- .../translation/FileReadOperation.java | 9 ++- .../translation/FileWriteOperation.java | 8 +-- .../mapreduce/translation/GraphPlanner.java | 51 ++++++++++++-- .../runners/mapreduce/translation/Graphs.java | 2 +- .../mapreduce/translation/JobPrototype.java | 28 ++------ .../mapreduce/translation/Operation.java | 1 + .../translation/PartitionOperation.java | 72 ++++++++++++++++++++ .../translation/ReadBoundedTranslator.java | 3 +- .../mapreduce/translation/SourceOperation.java | 24 +++++-- .../mapreduce/translation/ViewOperation.java | 59 ---------------- .../mapreduce/translation/ViewTranslator.java | 19 ++++-- 13 files changed, 224 insertions(+), 127 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/e562a443/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamInputFormat.java ---------------------------------------------------------------------- diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamInputFormat.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamInputFormat.java index 03a88aa..23534de 100644 --- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamInputFormat.java +++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamInputFormat.java @@ -34,6 +34,7 @@ import org.apache.beam.sdk.coders.SerializableCoder; import org.apache.beam.sdk.io.BoundedSource; import org.apache.beam.sdk.util.SerializableUtils; import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.values.TupleTag; import org.apache.commons.codec.binary.Base64; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapreduce.InputFormat; @@ -52,7 +53,7 @@ public class BeamInputFormat<T> extends InputFormat { private static final long DEFAULT_DESIRED_BUNDLE_SIZE_SIZE_BYTES = 5 * 1000 * 1000; - private List<BoundedSource<T>> sources; + private List<SourceOperation.TaggedSource> sources; private SerializedPipelineOptions options; public BeamInputFormat() { @@ -67,30 +68,37 @@ public class BeamInputFormat<T> extends InputFormat { || Strings.isNullOrEmpty(serializedPipelineOptions)) { return ImmutableList.of(); } - sources = (List<BoundedSource<T>>) SerializableUtils.deserializeFromByteArray( - Base64.decodeBase64(serializedBoundedSource), "BoundedSource"); + sources = (List<SourceOperation.TaggedSource>) SerializableUtils.deserializeFromByteArray( + Base64.decodeBase64(serializedBoundedSource), "TaggedSources"); options = ((SerializedPipelineOptions) SerializableUtils.deserializeFromByteArray( Base64.decodeBase64(serializedPipelineOptions), "SerializedPipelineOptions")); try { return FluentIterable.from(sources) - .transformAndConcat(new Function<BoundedSource<T>, Iterable<BoundedSource<T>>>() { + .transformAndConcat( + new Function<SourceOperation.TaggedSource, Iterable<SourceOperation.TaggedSource>>() { + @Override + public Iterable<SourceOperation.TaggedSource> apply( + final SourceOperation.TaggedSource taggedSource) { + try { + return FluentIterable.from(taggedSource.getSource().split( + DEFAULT_DESIRED_BUNDLE_SIZE_SIZE_BYTES, options.getPipelineOptions())) + .transform(new Function<BoundedSource<?>, SourceOperation.TaggedSource>() { + @Override + public SourceOperation.TaggedSource apply(BoundedSource<?> input) { + return SourceOperation.TaggedSource.of(input, taggedSource.getTag()); + }}); + } catch (Exception e) { + Throwables.throwIfUnchecked(e); + throw new RuntimeException(e); + } + } + }) + .transform(new Function<SourceOperation.TaggedSource, InputSplit>() { @Override - public Iterable<BoundedSource<T>> apply(BoundedSource<T> input) { - try { - return (Iterable<BoundedSource<T>>) input.split( - DEFAULT_DESIRED_BUNDLE_SIZE_SIZE_BYTES, options.getPipelineOptions()); - } catch (Exception e) { - Throwables.throwIfUnchecked(e); - throw new RuntimeException(e); - } - } - }) - .transform(new Function<BoundedSource<T>, InputSplit>() { - @Override - public InputSplit apply(BoundedSource<T> source) { - return new BeamInputSplit(source, options); + public InputSplit apply(SourceOperation.TaggedSource taggedSource) { + return new BeamInputSplit(taggedSource.getSource(), options, taggedSource.getTag()); }}) .toList(); } catch (Exception e) { @@ -107,17 +115,23 @@ public class BeamInputFormat<T> extends InputFormat { public static class BeamInputSplit<T> extends InputSplit implements Writable { private BoundedSource<T> boundedSource; private SerializedPipelineOptions options; + private TupleTag<?> tupleTag; public BeamInputSplit() { } - public BeamInputSplit(BoundedSource<T> boundedSource, SerializedPipelineOptions options) { + public BeamInputSplit( + BoundedSource<T> boundedSource, + SerializedPipelineOptions options, + TupleTag<?> tupleTag) { this.boundedSource = checkNotNull(boundedSource, "boundedSources"); this.options = checkNotNull(options, "options"); + this.tupleTag = checkNotNull(tupleTag, "tupleTag"); } public BeamRecordReader<T> createReader() throws IOException { - return new BeamRecordReader<>(boundedSource.createReader(options.getPipelineOptions())); + return new BeamRecordReader<>( + boundedSource.createReader(options.getPipelineOptions()), tupleTag); } @Override @@ -142,6 +156,7 @@ public class BeamInputFormat<T> extends InputFormat { ByteArrayOutputStream stream = new ByteArrayOutputStream(); SerializableCoder.of(BoundedSource.class).encode(boundedSource, stream); SerializableCoder.of(SerializedPipelineOptions.class).encode(options, stream); + SerializableCoder.of(TupleTag.class).encode(tupleTag, stream); byte[] bytes = stream.toByteArray(); out.writeInt(bytes.length); @@ -157,16 +172,19 @@ public class BeamInputFormat<T> extends InputFormat { ByteArrayInputStream inStream = new ByteArrayInputStream(bytes); boundedSource = SerializableCoder.of(BoundedSource.class).decode(inStream); options = SerializableCoder.of(SerializedPipelineOptions.class).decode(inStream); + tupleTag = SerializableCoder.of(TupleTag.class).decode(inStream); } } private static class BeamRecordReader<T> extends RecordReader { private final BoundedSource.BoundedReader<T> reader; + private TupleTag<?> tupleTag; private boolean started; - public BeamRecordReader(BoundedSource.BoundedReader<T> reader) { + public BeamRecordReader(BoundedSource.BoundedReader<T> reader, TupleTag<?> tupleTag) { this.reader = checkNotNull(reader, "reader"); + this.tupleTag = checkNotNull(tupleTag, "tupleTag"); this.started = false; } @@ -187,13 +205,19 @@ public class BeamInputFormat<T> extends InputFormat { @Override public Object getCurrentKey() throws IOException, InterruptedException { - return "global"; + return tupleTag; } @Override public Object getCurrentValue() throws IOException, InterruptedException { - return WindowedValue.timestampedValueInGlobalWindow( - reader.getCurrent(), reader.getCurrentTimestamp()); + // TODO: this is a hack to handle that reads from materialized PCollections + // already return WindowedValue. + if (reader.getCurrent() instanceof WindowedValue) { + return reader.getCurrent(); + } else { + return WindowedValue.timestampedValueInGlobalWindow( + reader.getCurrent(), reader.getCurrentTimestamp()); + } } @Override http://git-wip-us.apache.org/repos/asf/beam/blob/e562a443/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamMapper.java ---------------------------------------------------------------------- diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamMapper.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamMapper.java index d3ebb5c..b03236f 100644 --- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamMapper.java +++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamMapper.java @@ -22,6 +22,7 @@ import static com.google.common.base.Preconditions.checkNotNull; import java.io.IOException; import org.apache.beam.sdk.util.SerializableUtils; import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.values.KV; import org.apache.commons.codec.binary.Base64; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.TaskInputOutputContext; @@ -57,7 +58,7 @@ public class BeamMapper<ValueInT, ValueOutT> Mapper<Object, WindowedValue<ValueInT>, Object, WindowedValue<ValueOutT>>.Context context) throws IOException, InterruptedException { LOG.info("key: {} value: {}.", key, value); - operation.process(value); + operation.process(WindowedValue.valueInGlobalWindow(KV.of(key, value))); } @Override http://git-wip-us.apache.org/repos/asf/beam/blob/e562a443/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FileReadOperation.java ---------------------------------------------------------------------- diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FileReadOperation.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FileReadOperation.java index 674e30a..6bd893a 100644 --- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FileReadOperation.java +++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FileReadOperation.java @@ -31,6 +31,7 @@ import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.io.BoundedSource; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.WindowingStrategy; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; @@ -45,8 +46,12 @@ import org.apache.hadoop.io.SequenceFile; */ public class FileReadOperation<T> extends SourceOperation<WindowedValue<T>> { - public FileReadOperation(int producerStageId, String fileName, Coder<T> coder) { - super(new FileBoundedSource<>(producerStageId, fileName, coder)); + public FileReadOperation( + int producerStageId, + String fileName, + Coder<T> coder, + TupleTag<?> tupleTag) { + super(new FileBoundedSource<>(producerStageId, fileName, coder), tupleTag); } private static class FileBoundedSource<T> extends BoundedSource<WindowedValue<T>> { http://git-wip-us.apache.org/repos/asf/beam/blob/e562a443/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FileWriteOperation.java ---------------------------------------------------------------------- diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FileWriteOperation.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FileWriteOperation.java index 468856a..af2e134 100644 --- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FileWriteOperation.java +++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FileWriteOperation.java @@ -24,7 +24,6 @@ import java.io.ByteArrayOutputStream; import java.io.IOException; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.util.WindowedValue; -import org.apache.beam.sdk.values.WindowingStrategy; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.TaskInputOutputContext; @@ -39,13 +38,10 @@ public class FileWriteOperation<T> extends Operation<T> { private final Coder<WindowedValue<T>> coder; private transient MultipleOutputs mos; - public FileWriteOperation(String fileName, Coder<T> coder) { + public FileWriteOperation(String fileName, Coder<WindowedValue<T>> coder) { super(0); this.fileName = checkNotNull(fileName, "fileName"); - checkNotNull(coder, "coder"); - // TODO: should not hard-code windows coder. - this.coder = WindowedValue.getFullCoder( - coder, WindowingStrategy.globalDefault().getWindowFn().windowCoder()); + this.coder = checkNotNull(coder, "coder"); } @Override http://git-wip-us.apache.org/repos/asf/beam/blob/e562a443/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GraphPlanner.java ---------------------------------------------------------------------- diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GraphPlanner.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GraphPlanner.java index 13d215f..7c76823 100644 --- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GraphPlanner.java +++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GraphPlanner.java @@ -17,9 +17,20 @@ */ package org.apache.beam.runners.mapreduce.translation; +import static com.google.common.base.Preconditions.checkState; + +import com.google.common.base.Function; +import com.google.common.base.Joiner; +import com.google.common.collect.FluentIterable; import com.google.common.collect.ImmutableList; +import com.google.common.collect.Iterables; +import java.util.ArrayList; import java.util.List; +import javax.annotation.Nullable; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.WindowingStrategy; /** * Class that optimizes the initial graph to a fused graph. @@ -39,21 +50,26 @@ public class GraphPlanner { continue; } Graphs.Step producer = fusedStep.getProducer(tag); - if (producer.getOperation() instanceof ViewOperation) { + if (producer.getOperation() instanceof FileWriteOperation) { continue; } String tagName = tag.getName(); String fileName = tagName.replaceAll("[^A-Za-z0-9]", "0"); + + // TODO: should not hard-code windows coder. + WindowedValue.WindowedValueCoder<?> writeValueCoder = WindowedValue.getFullCoder( + tag.getCoder(), WindowingStrategy.globalDefault().getWindowFn().windowCoder()); + fusedStep.addStep( Graphs.Step.of( tagName + "/Write", - new FileWriteOperation(fileName, tag.getCoder())), + new FileWriteOperation(fileName, writeValueCoder)), ImmutableList.of(tag), ImmutableList.<Graphs.Tag>of()); String readStepName = tagName + "/Read"; Graphs.Tag readOutput = Graphs.Tag.of( - readStepName + ".out", new TupleTag<>(), tag.getCoder()); + readStepName + ".out", tag.getTupleTag(), tag.getCoder()); for (Graphs.FusedStep consumer : consumers) { // Re-direct tag to readOutput. List<Graphs.Step> receivers = consumer.getConsumers(tag); @@ -64,13 +80,40 @@ public class GraphPlanner { consumer.addStep( Graphs.Step.of( readStepName, - new FileReadOperation(fusedStep.getStageId(), fileName, tag.getCoder())), + new FileReadOperation( + fusedStep.getStageId(), fileName, tag.getCoder(), tag.getTupleTag())), ImmutableList.<Graphs.Tag>of(), ImmutableList.of(readOutput)); } } } + // Insert PartitionOperation + for (final Graphs.FusedStep fusedStep : fusedGraph.getFusedSteps()) { + List<Graphs.Step> readSteps = fusedStep.getStartSteps(); + + List<SourceOperation.TaggedSource> sources = new ArrayList<>(); + List<Graphs.Tag> readOutTags = new ArrayList<>(); + List<TupleTag<?>> readOutTupleTags = new ArrayList<>(); + StringBuilder partitionStepName = new StringBuilder(); + for (Graphs.Step step : readSteps) { + checkState(step.getOperation() instanceof SourceOperation); + sources.add(((SourceOperation) step.getOperation()).getTaggedSource()); + Graphs.Tag tag = Iterables.getOnlyElement(fusedStep.getOutputTags(step)); + readOutTags.add(tag); + readOutTupleTags.add(tag.getTupleTag()); + partitionStepName.append(step.getFullName()); + + fusedStep.removeStep(step); + } + if (partitionStepName.length() > 0) { + partitionStepName.deleteCharAt(partitionStepName.length() - 1); + } + + Graphs.Step partitionStep = + Graphs.Step.of(partitionStepName.toString(), new PartitionOperation(sources)); + fusedStep.addStep(partitionStep, ImmutableList.<Graphs.Tag>of(), readOutTags); + } return fusedGraph; } } http://git-wip-us.apache.org/repos/asf/beam/blob/e562a443/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Graphs.java ---------------------------------------------------------------------- diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Graphs.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Graphs.java index 97b5441..9743d09 100644 --- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Graphs.java +++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Graphs.java @@ -81,7 +81,7 @@ public class Graphs { Graphs.Step step, List<Graphs.Tag> inTags, List<Graphs.Tag> outTags) { - if (step.getOperation() instanceof ViewOperation) { + if (step.getOperation() instanceof FileWriteOperation) { return false; } if (outTags.size() != 1) { http://git-wip-us.apache.org/repos/asf/beam/blob/e562a443/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/JobPrototype.java ---------------------------------------------------------------------- diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/JobPrototype.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/JobPrototype.java index 1016e22..677f3a7 100644 --- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/JobPrototype.java +++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/JobPrototype.java @@ -20,8 +20,6 @@ package org.apache.beam.runners.mapreduce.translation; import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkState; -import com.google.common.base.Function; -import com.google.common.collect.FluentIterable; import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterables; import java.io.IOException; @@ -30,7 +28,6 @@ import java.util.Collections; import java.util.List; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.KvCoder; -import org.apache.beam.sdk.io.BoundedSource; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.util.SerializableUtils; import org.apache.beam.sdk.util.WindowedValue; @@ -81,23 +78,14 @@ public class JobPrototype { String.format("/tmp/mapreduce/stage-%d", fusedStep.getStageId())); // Setup BoundedSources in BeamInputFormat. - // TODO: support more than one read steps by introducing a composed BeamInputFormat - // and a partition operation. - List<Graphs.Step> readSteps = fusedStep.getStartSteps(); - ArrayList<BoundedSource<?>> sources = new ArrayList<>(); - sources.addAll( - FluentIterable.from(readSteps) - .transform(new Function<Graphs.Step, BoundedSource<?>>() { - @Override - public BoundedSource<?> apply(Graphs.Step step) { - checkState(step.getOperation() instanceof SourceOperation); - return ((SourceOperation) step.getOperation()).getSource(); - }}) - .toList()); + Graphs.Step startStep = Iterables.getOnlyElement(fusedStep.getStartSteps()); + checkState(startStep.getOperation() instanceof PartitionOperation); + PartitionOperation partitionOperation = (PartitionOperation) startStep.getOperation(); conf.set( BeamInputFormat.BEAM_SERIALIZED_BOUNDED_SOURCE, - Base64.encodeBase64String(SerializableUtils.serializeToByteArray(sources))); + Base64.encodeBase64String(SerializableUtils.serializeToByteArray( + new ArrayList<>(partitionOperation.getTaggedSources())))); conf.set( BeamInputFormat.BEAM_SERIALIZED_PIPELINE_OPTIONS, Base64.encodeBase64String(SerializableUtils.serializeToByteArray( @@ -151,16 +139,14 @@ public class JobPrototype { } // Setup DoFns in BeamMapper. - Graphs.Tag readOutputTag = Iterables.getOnlyElement(fusedStep.getOutputTags(readSteps.get(0))); - Graphs.Step mapperStartStep = Iterables.getOnlyElement(fusedStep.getConsumers(readOutputTag)); - chainOperations(mapperStartStep, fusedStep); + chainOperations(startStep, fusedStep); job.setMapOutputKeyClass(BytesWritable.class); job.setMapOutputValueClass(byte[].class); conf.set( BeamMapper.BEAM_PAR_DO_OPERATION_MAPPER, Base64.encodeBase64String( - SerializableUtils.serializeToByteArray(mapperStartStep.getOperation()))); + SerializableUtils.serializeToByteArray(startStep.getOperation()))); job.setMapperClass(BeamMapper.class); job.setOutputFormatClass(TextOutputFormat.class); http://git-wip-us.apache.org/repos/asf/beam/blob/e562a443/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Operation.java ---------------------------------------------------------------------- diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Operation.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Operation.java index 574f152..7504e1c 100644 --- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Operation.java +++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Operation.java @@ -76,6 +76,7 @@ public abstract class Operation<T> implements Serializable { } public List<OutputReceiver> getOutputReceivers() { + // TODO: avoid allocating objects for each output emit. return ImmutableList.copyOf(receivers); } http://git-wip-us.apache.org/repos/asf/beam/blob/e562a443/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/PartitionOperation.java ---------------------------------------------------------------------- diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/PartitionOperation.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/PartitionOperation.java new file mode 100644 index 0000000..b8aefd6 --- /dev/null +++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/PartitionOperation.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.mapreduce.translation; + +import static com.google.common.base.Preconditions.checkNotNull; +import static com.google.common.base.Preconditions.checkState; + +import com.google.common.base.Function; +import com.google.common.collect.FluentIterable; +import java.io.IOException; +import java.util.List; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.TupleTag; + +/** + * Operation that partitions input elements based on their {@link TupleTag} keys. + */ +public class PartitionOperation extends Operation<KV<TupleTag<?>, Object>> { + + private final List<SourceOperation.TaggedSource> sources; + private final List<TupleTag<?>> tupleTags; + + public PartitionOperation(List<SourceOperation.TaggedSource> sources) { + super(sources.size()); + this.sources = checkNotNull(sources, "sources"); + this.tupleTags = FluentIterable.from(sources) + .transform(new Function<SourceOperation.TaggedSource, TupleTag<?>>() { + @Override + public TupleTag<?> apply(SourceOperation.TaggedSource input) { + return input.getTag(); + }}) + .toList(); + } + + public List<SourceOperation.TaggedSource> getTaggedSources() { + return sources; + } + + @Override + public void process(WindowedValue<KV<TupleTag<?>, Object>> elem) throws IOException, + InterruptedException { + TupleTag<?> tupleTag = elem.getValue().getKey(); + int outputIndex = getOutputIndex(tupleTag); + OutputReceiver receiver = getOutputReceivers().get(outputIndex); + receiver.process((WindowedValue<?>) elem.getValue().getValue()); + } + + @Override + protected int getOutputIndex(TupleTag<?> tupleTag) { + int index = tupleTags.indexOf(tupleTag); + checkState( + index >= 0, + String.format("Cannot find index for tuple tag: %s.", tupleTag)); + return index; + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/e562a443/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ReadBoundedTranslator.java ---------------------------------------------------------------------- diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ReadBoundedTranslator.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ReadBoundedTranslator.java index 86ee78a..e93986b 100644 --- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ReadBoundedTranslator.java +++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ReadBoundedTranslator.java @@ -27,7 +27,8 @@ class ReadBoundedTranslator<T> extends TransformTranslator.Default<Read.Bounded< public void translateNode(Read.Bounded transform, TranslationContext context) { TranslationContext.UserGraphContext userGraphContext = context.getUserGraphContext(); - SourceOperation operation = new SourceOperation(transform.getSource()); + SourceOperation operation = + new SourceOperation(transform.getSource(), userGraphContext.getOnlyOutputTag()); context.addInitStep( Graphs.Step.of(userGraphContext.getStepName(), operation), userGraphContext.getInputTags(), http://git-wip-us.apache.org/repos/asf/beam/blob/e562a443/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/SourceOperation.java ---------------------------------------------------------------------- diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/SourceOperation.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/SourceOperation.java index 2163f34..4ac850f 100644 --- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/SourceOperation.java +++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/SourceOperation.java @@ -19,18 +19,23 @@ package org.apache.beam.runners.mapreduce.translation; import static com.google.common.base.Preconditions.checkNotNull; +import com.google.auto.value.AutoValue; +import java.io.Serializable; import org.apache.beam.sdk.io.BoundedSource; import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.values.TupleTag; /** * A Read.Bounded place holder {@link Operation} during pipeline translation. */ class SourceOperation<T> extends Operation<T> { - private final BoundedSource<T> source; + private final TaggedSource source; - SourceOperation(BoundedSource<T> source) { + SourceOperation(BoundedSource<?> boundedSource, TupleTag<?> tupleTag) { super(1); - this.source = checkNotNull(source, "source"); + checkNotNull(boundedSource, "boundedSource"); + checkNotNull(tupleTag, "tupleTag"); + this.source = TaggedSource.of(boundedSource, tupleTag); } @Override @@ -39,7 +44,18 @@ class SourceOperation<T> extends Operation<T> { String.format("%s should not in execution graph.", this.getClass().getSimpleName())); } - BoundedSource<?> getSource() { + TaggedSource getTaggedSource() { return source; } + + @AutoValue + abstract static class TaggedSource implements Serializable { + abstract BoundedSource<?> getSource(); + abstract TupleTag<?> getTag(); + + static TaggedSource of(BoundedSource<?> boundedSource, TupleTag<?> tupleTag) { + return new org.apache.beam.runners.mapreduce.translation + .AutoValue_SourceOperation_TaggedSource(boundedSource, tupleTag); + } + } } http://git-wip-us.apache.org/repos/asf/beam/blob/e562a443/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ViewOperation.java ---------------------------------------------------------------------- diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ViewOperation.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ViewOperation.java deleted file mode 100644 index 093f00e..0000000 --- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ViewOperation.java +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.mapreduce.translation; - -import static com.google.common.base.Preconditions.checkNotNull; - -import com.google.common.base.Throwables; -import java.io.ByteArrayOutputStream; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.util.WindowedValue; -import org.apache.hadoop.io.BytesWritable; -import org.apache.hadoop.mapreduce.TaskInputOutputContext; - -/** - * {@link Operation} that materializes views. - */ -public class ViewOperation<T> extends Operation<T> { - - private final Coder<WindowedValue<T>> valueCoder; - - private transient TaskInputOutputContext<Object, Object, Object, Object> taskContext; - - public ViewOperation(Coder<WindowedValue<T>> valueCoder) { - super(0); - this.valueCoder = checkNotNull(valueCoder, "valueCoder"); - } - - @Override - public void start(TaskInputOutputContext<Object, Object, Object, Object> taskContext) { - this.taskContext = checkNotNull(taskContext, "taskContext"); - } - - @Override - public void process(WindowedValue<T> elem) { - try { - ByteArrayOutputStream valueStream = new ByteArrayOutputStream(); - valueCoder.encode(elem, valueStream); - taskContext.write(new BytesWritable("view".getBytes()), valueStream.toByteArray()); - } catch (Exception e) { - Throwables.throwIfUnchecked(e); - throw new RuntimeException(e); - } - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/e562a443/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ViewTranslator.java ---------------------------------------------------------------------- diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ViewTranslator.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ViewTranslator.java index d5eac73..dfa18c8 100644 --- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ViewTranslator.java +++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ViewTranslator.java @@ -17,11 +17,14 @@ */ package org.apache.beam.runners.mapreduce.translation; -import org.apache.beam.sdk.coders.Coder; +import com.google.common.collect.Iterables; import org.apache.beam.sdk.transforms.View; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.WindowingStrategy; /** - * Translates a {@link View.CreatePCollectionView} to a {@link ViewOperation}. + * Translates a {@link View.CreatePCollectionView} to a {@link FileWriteOperation}. */ public class ViewTranslator extends TransformTranslator.Default<View.CreatePCollectionView<?, ?>> { @@ -30,8 +33,16 @@ public class ViewTranslator extends TransformTranslator.Default<View.CreatePColl View.CreatePCollectionView<?, ?> transform, TranslationContext context) { TranslationContext.UserGraphContext userGraphContext = context.getUserGraphContext(); - ViewOperation<?> operation = - new ViewOperation<>((Coder) transform.getView().getPCollection().getCoder()); + PCollection<?> inPCollection = transform.getView().getPCollection(); + WindowingStrategy<?, ?> windowingStrategy = inPCollection.getWindowingStrategy(); + + Graphs.Tag outTag = Iterables.getOnlyElement(userGraphContext.getOutputTags()); + String fileName = outTag.getName().replaceAll("[^A-Za-z0-9]", "0"); + + FileWriteOperation<?> operation = new FileWriteOperation<>( + fileName, + WindowedValue.getFullCoder( + inPCollection.getCoder(), windowingStrategy.getWindowFn().windowCoder())); context.addInitStep( Graphs.Step.of(userGraphContext.getStepName(), operation), userGraphContext.getInputTags(),