http://git-wip-us.apache.org/repos/asf/beam/blob/16e63205/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/JobPrototype.java ---------------------------------------------------------------------- diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/JobPrototype.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/JobPrototype.java index 576c6bf..c336a70 100644 --- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/JobPrototype.java +++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/JobPrototype.java @@ -1,27 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.beam.runners.mapreduce.translation; import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkState; -import com.google.common.base.Predicate; -import com.google.common.collect.FluentIterable; import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterables; import com.google.common.collect.Sets; import java.io.IOException; -import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Set; -import javax.annotation.Nullable; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.KvCoder; -import org.apache.beam.sdk.io.Read; -import org.apache.beam.sdk.options.PipelineOptionsFactory; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.GroupByKey; -import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.transforms.windowing.Window; +import org.apache.beam.sdk.io.BoundedSource; +import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.util.SerializableUtils; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.TupleTag; @@ -33,22 +42,25 @@ import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; /** - * Created by peihe on 24/07/2017. + * Class that translates a {@link Graphs.FusedStep} to a MapReduce job. */ public class JobPrototype { - public static JobPrototype create(int stageId, Graph.Vertex vertex) { - return new JobPrototype(stageId, vertex); + public static JobPrototype create( + int stageId, Graphs.FusedStep fusedStep, PipelineOptions options) { + return new JobPrototype(stageId, fusedStep, options); } private final int stageId; - private final Graph.Vertex vertex; + private final Graphs.FusedStep fusedStep; private final Set<JobPrototype> dependencies; + private final PipelineOptions options; - private JobPrototype(int stageId, Graph.Vertex vertex) { + private JobPrototype(int stageId, Graphs.FusedStep fusedStep, PipelineOptions options) { this.stageId = stageId; - this.vertex = checkNotNull(vertex, "vertex"); + this.fusedStep = checkNotNull(fusedStep, "fusedStep"); this.dependencies = Sets.newHashSet(); + this.options = checkNotNull(options, "options"); } public Job build(Class<?> jarClass, Configuration conf) throws IOException { @@ -57,168 +69,101 @@ public class JobPrototype { job.setJarByClass(jarClass); conf.set( "io.serializations", - "org.apache.hadoop.io.serializer.WritableSerialization," + - "org.apache.hadoop.io.serializer.JavaSerialization"); + "org.apache.hadoop.io.serializer.WritableSerialization," + + "org.apache.hadoop.io.serializer.JavaSerialization"); // Setup BoundedSources in BeamInputFormat. - // TODO: support more than one in-edge - Graph.Edge inEdge = Iterables.getOnlyElement(vertex.getIncoming()); - Graph.Vertex head = inEdge.getHead(); - Graph.Step headStep = head.getStep(); - checkState(headStep.getTransform() instanceof Read.Bounded); - Read.Bounded read = (Read.Bounded) headStep.getTransform(); + // TODO: support more than one read steps by introducing a composed BeamInputFormat + // and a partition operation. + Graphs.Step readStep = Iterables.getOnlyElement(fusedStep.getStartSteps()); + checkState(readStep.getOperation() instanceof ReadOperation); + BoundedSource source = ((ReadOperation) readStep.getOperation()).getSource(); conf.set( BeamInputFormat.BEAM_SERIALIZED_BOUNDED_SOURCE, - Base64.encodeBase64String(SerializableUtils.serializeToByteArray(read.getSource()))); + Base64.encodeBase64String(SerializableUtils.serializeToByteArray(source))); job.setInputFormatClass(BeamInputFormat.class); - // Setup DoFns in BeamMapper. - // TODO: support more than one in-path. - Graph.NodePath inPath = Iterables.getOnlyElement(inEdge.getPaths()); - - Operation mapperParDoRoot = chainParDosInPath(inPath); - Operation mapperParDoTail = getTailOperation(mapperParDoRoot); - Graph.Step vertexStep = vertex.getStep(); - if (vertexStep.getTransform() instanceof ParDo.SingleOutput - || vertexStep.getTransform() instanceof ParDo.MultiOutput - || vertexStep.getTransform() instanceof Window.Assign) { - // TODO: add a TailVertex type to simplify the translation. - Operation vertexParDo = translateToOperation(vertexStep); - Operation mapperWrite = new WriteOperation( - getKeyCoder(inEdge.getCoder()), - getReifyValueCoder(inEdge.getCoder(), vertexStep.getWindowingStrategy())); - mapperParDoTail.attachOutput(vertexParDo, 0); - vertexParDo.attachOutput(mapperWrite, 0); - } else if (vertexStep.getTransform() instanceof GroupByKey) { - Operation reifyOperation = new ReifyTimestampAndWindowsParDoOperation( - PipelineOptionsFactory.create(), - new TupleTag<>(), - ImmutableList.<TupleTag<?>>of(), - vertexStep.getWindowingStrategy()); - Operation mapperWrite = new WriteOperation( - getKeyCoder(inEdge.getCoder()), - getReifyValueCoder(inEdge.getCoder(), vertexStep.getWindowingStrategy())); - mapperParDoTail.attachOutput(reifyOperation, 0); - reifyOperation.attachOutput(mapperWrite, 0); - } else { - throw new UnsupportedOperationException("Transform: " + vertexStep.getTransform()); - } - job.setMapOutputKeyClass(BytesWritable.class); - job.setMapOutputValueClass(byte[].class); - conf.set( - BeamMapper.BEAM_PAR_DO_OPERATION_MAPPER, - Base64.encodeBase64String(SerializableUtils.serializeToByteArray(mapperParDoRoot))); - job.setMapperClass(BeamMapper.class); + if (fusedStep.containsGroupByKey()) { + Graphs.Step groupByKey = fusedStep.getGroupByKeyStep(); + GroupByKeyOperation operation = (GroupByKeyOperation) groupByKey.getOperation(); + WindowingStrategy<?, ?> windowingStrategy = operation.getWindowingStrategy(); + KvCoder<?, ?> kvCoder = operation.getKvCoder(); + + Coder<?> reifyValueCoder = getReifyValueCoder(kvCoder.getValueCoder(), windowingStrategy); + Graphs.Tag reifyOutputTag = Graphs.Tag.of(new TupleTag<Object>(), reifyValueCoder); + Graphs.Step reifyStep = Graphs.Step.of( + groupByKey.getFullName() + "-Reify", + new ReifyTimestampAndWindowsParDoOperation(options, operation.getWindowingStrategy()), + groupByKey.getInputTags(), + ImmutableList.of(reifyOutputTag)); + + Graphs.Step writeStep = Graphs.Step.of( + groupByKey.getFullName() + "-Write", + new WriteOperation(kvCoder.getKeyCoder(), reifyValueCoder), + ImmutableList.of(reifyOutputTag), + Collections.<Graphs.Tag>emptyList()); + + Graphs.Step gabwStep = Graphs.Step.of( + groupByKey.getFullName() + "-GroupAlsoByWindows", + new GroupAlsoByWindowsParDoOperation(options, windowingStrategy, kvCoder), + Collections.<Graphs.Tag>emptyList(), + groupByKey.getOutputTags()); + + fusedStep.addStep(reifyStep); + fusedStep.addStep(writeStep); + fusedStep.addStep(gabwStep); + fusedStep.removeStep(groupByKey); - if (vertexStep.getTransform() instanceof GroupByKey) { // Setup BeamReducer - Operation gabwOperation = new GroupAlsoByWindowsParDoOperation( - PipelineOptionsFactory.create(), - (TupleTag<Object>) vertexStep.getOutputs().iterator().next(), - ImmutableList.<TupleTag<?>>of(), - vertexStep.getWindowingStrategy(), - inEdge.getCoder()); - Graph.Edge outEdge = Iterables.getOnlyElement(vertex.getOutgoing()); - Graph.NodePath outPath = Iterables.getOnlyElement(outEdge.getPaths()); - Operation reducerParDoRoot = chainParDosInPath(outPath); - Operation reducerParDoTail = getTailOperation(reducerParDoRoot); - - Operation reducerTailParDo = translateToOperation(outEdge.getTail().getStep()); - if (reducerParDoRoot == null) { - gabwOperation.attachOutput(reducerTailParDo, 0); - } else { - gabwOperation.attachOutput(reducerParDoRoot, 0); - reducerParDoTail.attachOutput(reducerTailParDo, 0); - } + Graphs.Step reducerStartStep = gabwStep; + chainOperations(reducerStartStep, fusedStep); conf.set( BeamReducer.BEAM_REDUCER_KV_CODER, Base64.encodeBase64String(SerializableUtils.serializeToByteArray( - KvCoder.of( - getKeyCoder(inEdge.getCoder()), - getReifyValueCoder(inEdge.getCoder(), vertexStep.getWindowingStrategy()))))); + KvCoder.of(kvCoder.getKeyCoder(), reifyValueCoder)))); conf.set( BeamReducer.BEAM_PAR_DO_OPERATION_REDUCER, - Base64.encodeBase64String(SerializableUtils.serializeToByteArray(gabwOperation))); + Base64.encodeBase64String( + SerializableUtils.serializeToByteArray(reducerStartStep.getOperation()))); job.setReducerClass(BeamReducer.class); } - job.setOutputFormatClass(NullOutputFormat.class); - return job; - } + // Setup DoFns in BeamMapper. + Graphs.Tag readOutputTag = Iterables.getOnlyElement(readStep.getOutputTags()); + Graphs.Step mapperStartStep = Iterables.getOnlyElement(fusedStep.getConsumers(readOutputTag)); + chainOperations(mapperStartStep, fusedStep); - private Coder<Object> getKeyCoder(Coder<?> coder) { - KvCoder<Object, Object> kvCoder = (KvCoder<Object, Object>) checkNotNull(coder, "coder"); - return kvCoder.getKeyCoder(); - } + job.setMapOutputKeyClass(BytesWritable.class); + job.setMapOutputValueClass(byte[].class); + conf.set( + BeamMapper.BEAM_PAR_DO_OPERATION_MAPPER, + Base64.encodeBase64String( + SerializableUtils.serializeToByteArray(mapperStartStep.getOperation()))); + job.setMapperClass(BeamMapper.class); - private Coder<Object> getReifyValueCoder( - Coder<?> coder, WindowingStrategy<?, ?> windowingStrategy) { - KvCoder<Object, Object> kvCoder = (KvCoder<Object, Object>) checkNotNull(coder, "coder"); - return (Coder) WindowedValue.getFullCoder( - kvCoder.getValueCoder(), windowingStrategy.getWindowFn().windowCoder()); - } + job.setOutputFormatClass(NullOutputFormat.class); - private Operation getTailOperation(@Nullable Operation operation) { - if (operation == null) { - return null; - } - if (operation.getOutputReceivers().isEmpty()) { - return operation; - } - OutputReceiver receiver = Iterables.getOnlyElement(operation.getOutputReceivers()); - if (receiver.getReceivingOperations().isEmpty()) { - return operation; - } - return getTailOperation(Iterables.getOnlyElement(receiver.getReceivingOperations())); + return job; } - private Operation chainParDosInPath(Graph.NodePath path) { - List<Graph.Step> parDos = new ArrayList<>(); - // TODO: we should not need this filter. - parDos.addAll(FluentIterable.from(path.steps()) - .filter(new Predicate<Graph.Step>() { - @Override - public boolean apply(Graph.Step input) { - PTransform<?, ?> transform = input.getTransform(); - return !(transform instanceof Read.Bounded); - }}) - .toList()); - - Operation root = null; - Operation prev = null; - for (Graph.Step step : parDos) { - Operation current = translateToOperation(step); - if (prev == null) { - root = current; - } else { - // TODO: set a proper outputNum for ParDo.MultiOutput instead of zero. - prev.attachOutput(current, 0); + private void chainOperations(Graphs.Step current, Graphs.FusedStep fusedStep) { + Operation<?> operation = current.getOperation(); + List<Graphs.Tag> outputTags = current.getOutputTags(); + for (int index = 0; index < outputTags.size(); ++index) { + for (Graphs.Step consumer : fusedStep.getConsumers(outputTags.get(index))) { + operation.attachConsumer(index, consumer.getOperation()); + } + } + for (Graphs.Tag outTag : outputTags) { + for (Graphs.Step consumer : fusedStep.getConsumers(outTag)) { + chainOperations(consumer, fusedStep); } - prev = current; } - return root; } - private Operation translateToOperation(Graph.Step parDoStep) { - PTransform<?, ?> transform = parDoStep.getTransform(); - DoFn<Object, Object> doFn; - if (transform instanceof ParDo.SingleOutput) { - return new NormalParDoOperation( - ((ParDo.SingleOutput) transform).getFn(), - PipelineOptionsFactory.create(), - (TupleTag<Object>) parDoStep.getOutputs().iterator().next(), - ImmutableList.<TupleTag<?>>of(), - parDoStep.getWindowingStrategy()); - } else if (transform instanceof ParDo.MultiOutput) { - return new NormalParDoOperation( - ((ParDo.MultiOutput) transform).getFn(), - PipelineOptionsFactory.create(), - (TupleTag<Object>) parDoStep.getOutputs().iterator().next(), - ImmutableList.<TupleTag<?>>of(), - parDoStep.getWindowingStrategy()); - } else if (transform instanceof Window.Assign) { - return new WindowAssignOperation<>(1, parDoStep.getWindowingStrategy().getWindowFn()); - } else { - throw new UnsupportedOperationException("Transform: " + transform); - } + private Coder<Object> getReifyValueCoder( + Coder<?> valueCoder, WindowingStrategy<?, ?> windowingStrategy) { + // TODO: do we need full coder to encode windows. + return (Coder) WindowedValue.getFullCoder( + valueCoder, windowingStrategy.getWindowFn().windowCoder()); } }
http://git-wip-us.apache.org/repos/asf/beam/blob/16e63205/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/NormalParDoOperation.java ---------------------------------------------------------------------- diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/NormalParDoOperation.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/NormalParDoOperation.java index 1da39a9..fd1b528 100644 --- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/NormalParDoOperation.java +++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/NormalParDoOperation.java @@ -26,16 +26,16 @@ import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.WindowingStrategy; /** - * Created by peihe on 26/07/2017. + * {@link Operation} that executes a {@link DoFn}. */ -public class NormalParDoOperation extends ParDoOperation { +public class NormalParDoOperation<InputT, OutputT> extends ParDoOperation<InputT, OutputT> { - private final DoFn<Object, Object> doFn; + private final DoFn<InputT, OutputT> doFn; public NormalParDoOperation( - DoFn<Object, Object> doFn, + DoFn<InputT, OutputT> doFn, PipelineOptions options, - TupleTag<Object> mainOutputTag, + TupleTag<OutputT> mainOutputTag, List<TupleTag<?>> sideOutputTags, WindowingStrategy<?, ?> windowingStrategy) { super(options, mainOutputTag, sideOutputTags, windowingStrategy); @@ -43,7 +43,7 @@ public class NormalParDoOperation extends ParDoOperation { } @Override - DoFn<Object, Object> getDoFn() { + DoFn<InputT, OutputT> getDoFn() { return doFn; } } http://git-wip-us.apache.org/repos/asf/beam/blob/16e63205/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Operation.java ---------------------------------------------------------------------- diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Operation.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Operation.java index 6951909..187ea79 100644 --- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Operation.java +++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Operation.java @@ -1,14 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.beam.runners.mapreduce.translation; import com.google.common.collect.ImmutableList; import java.io.Serializable; import java.util.List; +import org.apache.beam.sdk.util.WindowedValue; import org.apache.hadoop.mapreduce.TaskInputOutputContext; /** - * Created by peihe on 26/07/2017. + * Class that processes elements and forwards outputs to consumers. */ -public abstract class Operation implements Serializable { +public abstract class Operation<T> implements Serializable { private final OutputReceiver[] receivers; public Operation(int numOutputs) { @@ -37,7 +55,7 @@ public abstract class Operation implements Serializable { /** * Processes the element. */ - public abstract void process(Object elem); + public abstract void process(WindowedValue<T> elem); /** * Finishes this Operation's execution. @@ -62,8 +80,8 @@ public abstract class Operation implements Serializable { /** * Adds an output to this Operation. */ - public void attachOutput(Operation output, int outputNum) { - OutputReceiver fanOut = receivers[outputNum]; - fanOut.addOutput(output); + public void attachConsumer(int outputIndex, Operation consumer) { + OutputReceiver fanOut = receivers[outputIndex]; + fanOut.addOutput(consumer); } } http://git-wip-us.apache.org/repos/asf/beam/blob/16e63205/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/OutputReceiver.java ---------------------------------------------------------------------- diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/OutputReceiver.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/OutputReceiver.java index 6aeefd2..3dab890 100644 --- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/OutputReceiver.java +++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/OutputReceiver.java @@ -21,6 +21,7 @@ import com.google.common.collect.ImmutableList; import java.io.Serializable; import java.util.ArrayList; import java.util.List; +import org.apache.beam.sdk.util.WindowedValue; /** * OutputReceiver that forwards each input it receives to each of a list of down stream operations. @@ -42,7 +43,7 @@ public class OutputReceiver implements Serializable { /** * Processes the element. */ - public void process(Object elem) { + public void process(WindowedValue<?> elem) { for (Operation out : receivingOperations) { if (out != null) { out.process(elem); http://git-wip-us.apache.org/repos/asf/beam/blob/16e63205/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ParDoOperation.java ---------------------------------------------------------------------- diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ParDoOperation.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ParDoOperation.java index 2627d20..a76773f 100644 --- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ParDoOperation.java +++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ParDoOperation.java @@ -36,19 +36,19 @@ import org.slf4j.LoggerFactory; /** * Operation for ParDo. */ -public abstract class ParDoOperation extends Operation { +public abstract class ParDoOperation<InputT, OutputT> extends Operation<InputT> { private static final Logger LOG = LoggerFactory.getLogger(ParDoOperation.class); protected final SerializedPipelineOptions options; - protected final TupleTag<Object> mainOutputTag; + protected final TupleTag<OutputT> mainOutputTag; private final List<TupleTag<?>> sideOutputTags; protected final WindowingStrategy<?, ?> windowingStrategy; - private DoFnRunner<Object, Object> fnRunner; + private DoFnRunner<InputT, OutputT> fnRunner; public ParDoOperation( PipelineOptions options, - TupleTag<Object> mainOutputTag, + TupleTag<OutputT> mainOutputTag, List<TupleTag<?>> sideOutputTags, WindowingStrategy<?, ?> windowingStrategy) { super(1 + sideOutputTags.size()); @@ -61,7 +61,7 @@ public abstract class ParDoOperation extends Operation { /** * Returns a {@link DoFn} for processing inputs. */ - abstract DoFn<Object, Object> getDoFn(); + abstract DoFn<InputT, OutputT> getDoFn(); @Override public void start(TaskInputOutputContext<Object, Object, Object, Object> taskContext) { @@ -82,9 +82,9 @@ public abstract class ParDoOperation extends Operation { * Processes the element. */ @Override - public void process(Object elem) { + public void process(WindowedValue<InputT> elem) { LOG.info("elem: {}.", elem); - fnRunner.processElement((WindowedValue<Object>) elem); + fnRunner.processElement(elem); } @Override http://git-wip-us.apache.org/repos/asf/beam/blob/16e63205/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ParDoTranslator.java ---------------------------------------------------------------------- diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ParDoTranslator.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ParDoTranslator.java new file mode 100644 index 0000000..1a1373a --- /dev/null +++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ParDoTranslator.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.mapreduce.translation; + +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.PCollection; + +/** + * Translates a {@link ParDo} to a {@link Operation}. + */ +class ParDoTranslator<InputT, OutputT> + extends TransformTranslator.Default<ParDo.MultiOutput<InputT, OutputT>> { + + @Override + public void translateNode( + ParDo.MultiOutput<InputT, OutputT> transform, TranslationContext context) { + TranslationContext.UserGraphContext userGraphContext = context.getUserGraphContext(); + NormalParDoOperation operation = new NormalParDoOperation( + transform.getFn(), + userGraphContext.getOptions(), + transform.getMainOutputTag(), + transform.getAdditionalOutputTags().getAll(), + ((PCollection) userGraphContext.getInput()).getWindowingStrategy()); + + context.addInitStep(Graphs.Step.of( + userGraphContext.getStepName(), + operation, + userGraphContext.getInputTags(), + userGraphContext.getOutputTags())); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/16e63205/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ReadBoundedTranslator.java ---------------------------------------------------------------------- diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ReadBoundedTranslator.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ReadBoundedTranslator.java new file mode 100644 index 0000000..0710827 --- /dev/null +++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ReadBoundedTranslator.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.mapreduce.translation; + +import org.apache.beam.sdk.io.Read; + +/** + * Translates a {@link Read.Bounded} to a {@link ReadOperation}. + */ +class ReadBoundedTranslator<T> extends TransformTranslator.Default<Read.Bounded<T>> { + @Override + public void translateNode(Read.Bounded transform, TranslationContext context) { + TranslationContext.UserGraphContext userGraphContext = context.getUserGraphContext(); + + ReadOperation operation = new ReadOperation(transform.getSource()); + context.addInitStep(Graphs.Step.of( + userGraphContext.getStepName(), + operation, + userGraphContext.getInputTags(), + userGraphContext.getOutputTags())); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/16e63205/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ReadOperation.java ---------------------------------------------------------------------- diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ReadOperation.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ReadOperation.java new file mode 100644 index 0000000..c199dc6 --- /dev/null +++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ReadOperation.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.mapreduce.translation; + +import static com.google.common.base.Preconditions.checkNotNull; + +import org.apache.beam.sdk.io.BoundedSource; +import org.apache.beam.sdk.util.WindowedValue; + +/** + * A Read.Bounded place holder {@link Operation} during pipeline translation. + */ +class ReadOperation<T> extends Operation<T> { + private final BoundedSource<T> source; + + ReadOperation(BoundedSource<T> source) { + super(1); + this.source = checkNotNull(source, "source"); + } + + @Override + public void process(WindowedValue elem) { + throw new IllegalStateException( + String.format("%s should not in execution graph.", this.getClass().getSimpleName())); + } + + BoundedSource<?> getSource() { + return source; + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/16e63205/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ReifyTimestampAndWindowsParDoOperation.java ---------------------------------------------------------------------- diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ReifyTimestampAndWindowsParDoOperation.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ReifyTimestampAndWindowsParDoOperation.java index ec954bb..83d1af5 100644 --- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ReifyTimestampAndWindowsParDoOperation.java +++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ReifyTimestampAndWindowsParDoOperation.java @@ -1,5 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.beam.runners.mapreduce.translation; +import com.google.common.collect.ImmutableList; import java.util.List; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.DoFn; @@ -10,16 +28,14 @@ import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.WindowingStrategy; /** - * Created by peihe on 27/07/2017. + * {@link Operation} that executes {@link ReifyTimestampAndWindowsDoFn}. */ public class ReifyTimestampAndWindowsParDoOperation extends ParDoOperation { public ReifyTimestampAndWindowsParDoOperation( PipelineOptions options, - TupleTag<Object> mainOutputTag, - List<TupleTag<?>> sideOutputTags, WindowingStrategy<?, ?> windowingStrategy) { - super(options, mainOutputTag, sideOutputTags, windowingStrategy); + super(options, new TupleTag<>(), ImmutableList.<TupleTag<?>>of(), windowingStrategy); } @Override http://git-wip-us.apache.org/repos/asf/beam/blob/16e63205/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/TransformTranslator.java ---------------------------------------------------------------------- diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/TransformTranslator.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/TransformTranslator.java new file mode 100644 index 0000000..f495372 --- /dev/null +++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/TransformTranslator.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.mapreduce.translation; + +import org.apache.beam.sdk.transforms.PTransform; + +/** + * Interface for classes capable of tranforming Beam PTransforms into Storm primitives. + */ +interface TransformTranslator<T extends PTransform<?, ?>> { + + void translateNode(T transform, TranslationContext context); + + /** + * Returns true if this translator can translate the given transform. + */ + boolean canTranslate(T transform, TranslationContext context); + + /** + * Default translator. + */ + class Default<T1 extends PTransform<?, ?>> implements TransformTranslator<T1> { + @Override + public void translateNode(T1 transform, TranslationContext context) { + + } + + @Override + public boolean canTranslate(T1 transform, TranslationContext context) { + return true; + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/16e63205/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/TranslationContext.java ---------------------------------------------------------------------- diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/TranslationContext.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/TranslationContext.java new file mode 100644 index 0000000..0df365e --- /dev/null +++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/TranslationContext.java @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.mapreduce.translation; + +import static com.google.common.base.Preconditions.checkNotNull; +import static com.google.common.base.Preconditions.checkState; + +import com.google.common.base.Function; +import com.google.common.collect.FluentIterable; +import com.google.common.collect.Iterables; +import com.google.common.collect.Maps; +import java.util.List; +import java.util.Map; +import org.apache.beam.runners.mapreduce.MapReducePipelineOptions; +import org.apache.beam.sdk.runners.TransformHierarchy; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PValue; +import org.apache.beam.sdk.values.TupleTag; + +/** + * Class that maintains contexts during translation. + */ +public class TranslationContext { + + private final UserGraphContext userGraphContext; + private final Graph<Graphs.Step, Graphs.Tag> initGraph; + + public TranslationContext(MapReducePipelineOptions options) { + this.userGraphContext = new UserGraphContext(options); + this.initGraph = new Graph<>(); + } + + public UserGraphContext getUserGraphContext() { + return userGraphContext; + } + + public void addInitStep(Graphs.Step step) { + initGraph.addStep(step); + } + + /** + * Returns {@link Graphs.Step steps} in reverse topological order. + */ + public Graph<Graphs.Step, Graphs.Tag> getInitGraph() { + return initGraph; + } + + /** + * Context of user graph. + */ + public static class UserGraphContext { + private final MapReducePipelineOptions options; + private final Map<PValue, TupleTag<?>> pValueToTupleTag; + private TransformHierarchy.Node currentNode; + + public UserGraphContext(MapReducePipelineOptions options) { + this.options = checkNotNull(options, "options"); + this.pValueToTupleTag = Maps.newHashMap(); + this.currentNode = null; + } + + public MapReducePipelineOptions getOptions() { + return options; + } + + public void setCurrentNode(TransformHierarchy.Node node) { + this.currentNode = node; + for (Map.Entry<TupleTag<?>, PValue> entry : currentNode.getOutputs().entrySet()) { + pValueToTupleTag.put(entry.getValue(), entry.getKey()); + } + } + + public String getStepName() { + return currentNode.getFullName(); + } + + public PValue getInput() { + return Iterables.get(currentNode.getInputs().values(), 0); + } + + public PValue getOutput() { + return Iterables.get(currentNode.getOutputs().values(), 0); + } + + public List<Graphs.Tag> getInputTags() { + return FluentIterable.from(currentNode.getInputs().values()) + .transform(new Function<PValue, Graphs.Tag>() { + @Override + public Graphs.Tag apply(PValue pValue) { + checkState( + pValueToTupleTag.containsKey(pValue), + String.format("Failed to find TupleTag for pValue: %s.", pValue)); + return Graphs.Tag.of( + pValueToTupleTag.get(pValue), ((PCollection<?>) pValue).getCoder()); + }}) + .toList(); + } + + public List<Graphs.Tag> getOutputTags() { + return FluentIterable.from(currentNode.getOutputs().entrySet()) + .transform(new Function<Map.Entry<TupleTag<?>, PValue>, Graphs.Tag>() { + @Override + public Graphs.Tag apply(Map.Entry<TupleTag<?>, PValue> entry) { + return Graphs.Tag.of(entry.getKey(), ((PCollection<?>) entry.getValue()).getCoder()); + }}) + .toList(); + } + + public TupleTag<?> getOnlyOutputTag() { + return Iterables.getOnlyElement(currentNode.getOutputs().keySet()); + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/16e63205/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/TranslatorRegistry.java ---------------------------------------------------------------------- diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/TranslatorRegistry.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/TranslatorRegistry.java new file mode 100644 index 0000000..f79260a --- /dev/null +++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/TranslatorRegistry.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.mapreduce.translation; + +import java.util.HashMap; +import java.util.Map; +import org.apache.beam.sdk.io.Read; +import org.apache.beam.sdk.transforms.Flatten; +import org.apache.beam.sdk.transforms.GroupByKey; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.View; +import org.apache.beam.sdk.transforms.windowing.Window; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Lookup table mapping PTransform types to associated TransformTranslator implementations. + */ +public class TranslatorRegistry { + + private static final Logger LOG = LoggerFactory.getLogger(TranslatorRegistry.class); + + private static final Map<Class<? extends PTransform>, TransformTranslator> TRANSLATORS = + new HashMap<>(); + + static { + TRANSLATORS.put(Read.Bounded.class, new ReadBoundedTranslator()); + TRANSLATORS.put(ParDo.MultiOutput.class, new ParDoTranslator()); + TRANSLATORS.put(Window.Assign.class, new WindowAssignTranslator<>()); + TRANSLATORS.put(Flatten.PCollections.class, new FlattenTranslator()); + TRANSLATORS.put(GroupByKey.class, new GroupByKeyTranslator()); + TRANSLATORS.put(View.CreatePCollectionView.class, new ViewTranslator()); + } + + public static TransformTranslator<?> getTranslator(PTransform<?, ?> transform) { + TransformTranslator<?> translator = TRANSLATORS.get(transform.getClass()); + if (translator == null) { + LOG.warn("Unsupported operator={}", transform.getClass().getName()); + } + return translator; + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/16e63205/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ViewOperation.java ---------------------------------------------------------------------- diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ViewOperation.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ViewOperation.java new file mode 100644 index 0000000..093f00e --- /dev/null +++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ViewOperation.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.mapreduce.translation; + +import static com.google.common.base.Preconditions.checkNotNull; + +import com.google.common.base.Throwables; +import java.io.ByteArrayOutputStream; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.mapreduce.TaskInputOutputContext; + +/** + * {@link Operation} that materializes views. + */ +public class ViewOperation<T> extends Operation<T> { + + private final Coder<WindowedValue<T>> valueCoder; + + private transient TaskInputOutputContext<Object, Object, Object, Object> taskContext; + + public ViewOperation(Coder<WindowedValue<T>> valueCoder) { + super(0); + this.valueCoder = checkNotNull(valueCoder, "valueCoder"); + } + + @Override + public void start(TaskInputOutputContext<Object, Object, Object, Object> taskContext) { + this.taskContext = checkNotNull(taskContext, "taskContext"); + } + + @Override + public void process(WindowedValue<T> elem) { + try { + ByteArrayOutputStream valueStream = new ByteArrayOutputStream(); + valueCoder.encode(elem, valueStream); + taskContext.write(new BytesWritable("view".getBytes()), valueStream.toByteArray()); + } catch (Exception e) { + Throwables.throwIfUnchecked(e); + throw new RuntimeException(e); + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/16e63205/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ViewTranslator.java ---------------------------------------------------------------------- diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ViewTranslator.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ViewTranslator.java new file mode 100644 index 0000000..815ce77 --- /dev/null +++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ViewTranslator.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.mapreduce.translation; + +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.transforms.View; + +/** + * Translates a {@link View.CreatePCollectionView} to a {@link ViewOperation}. + */ +public class ViewTranslator extends TransformTranslator.Default<View.CreatePCollectionView<?, ?>> { + + @Override + public void translateNode( + View.CreatePCollectionView<?, ?> transform, TranslationContext context) { + TranslationContext.UserGraphContext userGraphContext = context.getUserGraphContext(); + + ViewOperation<?> operation = + new ViewOperation<>((Coder) transform.getView().getPCollection().getCoder()); + + context.addInitStep(Graphs.Step.of( + userGraphContext.getStepName(), + operation, + userGraphContext.getInputTags(), + userGraphContext.getOutputTags())); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/16e63205/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/WindowAssignOperation.java ---------------------------------------------------------------------- diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/WindowAssignOperation.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/WindowAssignOperation.java index 144ef16..3279e11 100644 --- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/WindowAssignOperation.java +++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/WindowAssignOperation.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.beam.runners.mapreduce.translation; import static com.google.common.base.Preconditions.checkArgument; @@ -12,28 +29,24 @@ import org.apache.beam.sdk.util.WindowedValue; import org.joda.time.Instant; /** - * Created by peihe on 27/07/2017. + * {@link Operation} that executes for assigning windows to elements. */ -public class WindowAssignOperation<T, W extends BoundedWindow> extends Operation { +public class WindowAssignOperation<T, W extends BoundedWindow> extends Operation<T> { private final WindowFn<T, W> windowFn; - public WindowAssignOperation(int numOutputs, WindowFn<T, W> windowFn) { - super(numOutputs); + public WindowAssignOperation(WindowFn<T, W> windowFn) { + super(1); this.windowFn = checkNotNull(windowFn, "windowFn"); } @Override - public void process(Object elem) { - WindowedValue windowedValue = (WindowedValue) elem; + public void process(WindowedValue<T> elem) { try { - Collection<W> windows = windowFn.assignWindows(new AssignContextInternal<>(windowFn, windowedValue)); + Collection<W> windows = windowFn.assignWindows(new AssignContextInternal<>(windowFn, elem)); for (W window : windows) { OutputReceiver receiver = Iterables.getOnlyElement(getOutputReceivers()); receiver.process(WindowedValue.of( - windowedValue.getValue(), - windowedValue.getTimestamp(), - window, - windowedValue.getPane())); + elem.getValue(), elem.getTimestamp(), window, elem.getPane())); } } catch (Exception e) { Throwables.throwIfUnchecked(e); http://git-wip-us.apache.org/repos/asf/beam/blob/16e63205/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/WindowAssignTranslator.java ---------------------------------------------------------------------- diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/WindowAssignTranslator.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/WindowAssignTranslator.java new file mode 100644 index 0000000..367c375 --- /dev/null +++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/WindowAssignTranslator.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.mapreduce.translation; + +import org.apache.beam.sdk.transforms.windowing.Window; + +/** + * Translates a {@link Window.Assign} to a {@link WindowAssignOperation}. + */ +public class WindowAssignTranslator<T> extends TransformTranslator.Default<Window.Assign<T>> { + + @Override + public void translateNode(Window.Assign<T> transform, TranslationContext context) { + TranslationContext.UserGraphContext userGraphContext = context.getUserGraphContext(); + + WindowAssignOperation<T, ?> operation = new WindowAssignOperation<>(transform.getWindowFn()); + context.addInitStep(Graphs.Step.of( + userGraphContext.getStepName(), + operation, + userGraphContext.getInputTags(), + userGraphContext.getOutputTags())); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/16e63205/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/WriteOperation.java ---------------------------------------------------------------------- diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/WriteOperation.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/WriteOperation.java index 0585032..2eb4684 100644 --- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/WriteOperation.java +++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/WriteOperation.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.beam.runners.mapreduce.translation; import static com.google.common.base.Preconditions.checkNotNull; @@ -5,19 +22,15 @@ import static com.google.common.base.Preconditions.checkNotNull; import com.google.common.base.Throwables; import java.io.ByteArrayOutputStream; import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.KvCoder; -import org.apache.beam.sdk.coders.NullableCoder; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.KV; -import org.apache.beam.sdk.values.WindowingStrategy; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.mapreduce.TaskInputOutputContext; /** - * Created by peihe on 26/07/2017. + * {@link Operation} that materializes input for group by key. */ -public class WriteOperation extends Operation { +public class WriteOperation<T> extends Operation<T> { private final Coder<Object> keyCoder; private final Coder<Object> valueCoder; @@ -36,14 +49,14 @@ public class WriteOperation extends Operation { } @Override - public void process(Object elem) { - WindowedValue<KV<?, ?>> windowedElem = (WindowedValue<KV<?, ?>>) elem; + public void process(WindowedValue<T> elem) { + KV<?, ?> kv = (KV<?, ?>) elem.getValue(); try { ByteArrayOutputStream keyStream = new ByteArrayOutputStream(); - keyCoder.encode(windowedElem.getValue().getKey(), keyStream); + keyCoder.encode(kv.getKey(), keyStream); ByteArrayOutputStream valueStream = new ByteArrayOutputStream(); - valueCoder.encode(windowedElem.getValue().getValue(), valueStream); + valueCoder.encode(kv.getValue(), valueStream); taskContext.write(new BytesWritable(keyStream.toByteArray()), valueStream.toByteArray()); } catch (Exception e) { Throwables.throwIfUnchecked(e); http://git-wip-us.apache.org/repos/asf/beam/blob/16e63205/runners/map-reduce/src/test/java/org/apache/beam/runners/mapreduce/WordCountTest.java ---------------------------------------------------------------------- diff --git a/runners/map-reduce/src/test/java/org/apache/beam/runners/mapreduce/WordCountTest.java b/runners/map-reduce/src/test/java/org/apache/beam/runners/mapreduce/WordCountTest.java index a548ba7..363ba01 100644 --- a/runners/map-reduce/src/test/java/org/apache/beam/runners/mapreduce/WordCountTest.java +++ b/runners/map-reduce/src/test/java/org/apache/beam/runners/mapreduce/WordCountTest.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.beam.runners.mapreduce; import org.apache.beam.sdk.Pipeline; @@ -10,7 +27,7 @@ import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.SimpleFunction; -import org.apache.beam.sdk.transforms.windowing.SlidingWindows; +import org.apache.beam.sdk.transforms.windowing.FixedWindows; import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.values.KV; import org.apache.log4j.BasicConfigurator; @@ -75,11 +92,11 @@ public class WordCountTest { // Concepts #2 and #3: Our pipeline applies the composite CountWords transform, and passes the // static FormatAsTextFn() to the ParDo transform. p.apply("ReadLines", TextIO.read().from(input)) - .apply(Window.<String>into(SlidingWindows.of(Duration.millis(100)))) + .apply(Window.<String>into(FixedWindows.of(Duration.millis(1000)))) .apply(ParDo.of(new ExtractWordsFn())) .apply(Count.<String>perElement()) - .apply(MapElements.via(new FormatAsTextFn())); - //.apply("WriteCounts", TextIO.write().to(output)); + .apply(MapElements.via(new FormatAsTextFn())) + .apply("WriteCounts", TextIO.write().to(output)); p.run(); } http://git-wip-us.apache.org/repos/asf/beam/blob/16e63205/runners/map-reduce/src/test/java/org/apache/beam/runners/mapreduce/translation/GraphConverterTest.java ---------------------------------------------------------------------- diff --git a/runners/map-reduce/src/test/java/org/apache/beam/runners/mapreduce/translation/GraphConverterTest.java b/runners/map-reduce/src/test/java/org/apache/beam/runners/mapreduce/translation/GraphConverterTest.java index 4f0c283..76c8311 100644 --- a/runners/map-reduce/src/test/java/org/apache/beam/runners/mapreduce/translation/GraphConverterTest.java +++ b/runners/map-reduce/src/test/java/org/apache/beam/runners/mapreduce/translation/GraphConverterTest.java @@ -1,12 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.beam.runners.mapreduce.translation; import static org.junit.Assert.assertEquals; import com.google.common.collect.Iterables; +import org.apache.beam.runners.mapreduce.MapReducePipelineOptions; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.BigEndianIntegerCoder; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.testing.CrashingRunner; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.Sum; import org.apache.beam.sdk.values.KV; @@ -23,17 +43,18 @@ public class GraphConverterTest { @Test public void testCombine() throws Exception { - Pipeline p = Pipeline.create(); + MapReducePipelineOptions options = PipelineOptionsFactory.as(MapReducePipelineOptions.class); + options.setRunner(CrashingRunner.class); + Pipeline p = Pipeline.create(options); PCollection<KV<String, Integer>> input = p .apply(Create.empty(KvCoder.of(StringUtf8Coder.of(), BigEndianIntegerCoder.of()))) .apply(Sum.<String>integersPerKey()); - GraphConverter graphConverter = new GraphConverter(); + TranslationContext context = new TranslationContext(options); + GraphConverter graphConverter = new GraphConverter(context); p.traverseTopologically(graphConverter); - Graph graph = graphConverter.getGraph(); + Graph<Graphs.Step, Graphs.Tag> initGraph = context.getInitGraph(); - assertEquals(3, Iterables.size(graph.getAllVertices())); - assertEquals(2, Iterables.size(graph.getAllEdges())); - assertEquals(1, Iterables.size(graph.getLeafVertices())); + assertEquals(3, Iterables.size(initGraph.getSteps())); } } http://git-wip-us.apache.org/repos/asf/beam/blob/16e63205/runners/map-reduce/src/test/java/org/apache/beam/runners/mapreduce/translation/GraphPlannerTest.java ---------------------------------------------------------------------- diff --git a/runners/map-reduce/src/test/java/org/apache/beam/runners/mapreduce/translation/GraphPlannerTest.java b/runners/map-reduce/src/test/java/org/apache/beam/runners/mapreduce/translation/GraphPlannerTest.java index c98f817..cf5262f 100644 --- a/runners/map-reduce/src/test/java/org/apache/beam/runners/mapreduce/translation/GraphPlannerTest.java +++ b/runners/map-reduce/src/test/java/org/apache/beam/runners/mapreduce/translation/GraphPlannerTest.java @@ -1,12 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.beam.runners.mapreduce.translation; import static org.junit.Assert.assertEquals; import com.google.common.collect.Iterables; +import org.apache.beam.runners.mapreduce.MapReducePipelineOptions; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.BigEndianIntegerCoder; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.testing.CrashingRunner; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.Sum; import org.apache.beam.sdk.values.KV; @@ -23,20 +43,21 @@ public class GraphPlannerTest { @Test public void testCombine() throws Exception { - Pipeline p = Pipeline.create(); + MapReducePipelineOptions options = PipelineOptionsFactory.as(MapReducePipelineOptions.class); + options.setRunner(CrashingRunner.class); + Pipeline p = Pipeline.create(options); PCollection<KV<String, Integer>> input = p .apply(Create.empty(KvCoder.of(StringUtf8Coder.of(), BigEndianIntegerCoder.of()))) .apply(Sum.<String>integersPerKey()); - GraphConverter graphConverter = new GraphConverter(); - p.traverseTopologically(graphConverter); - Graph graph = graphConverter.getGraph(); + TranslationContext context = new TranslationContext(options); + GraphConverter graphConverter = new GraphConverter(context); + p.traverseTopologically(graphConverter); GraphPlanner planner = new GraphPlanner(); - Graph fusedGraph = planner.plan(graph); + Graphs.FusedGraph fusedGraph = planner.plan(context.getInitGraph()); - assertEquals(3, Iterables.size(fusedGraph.getAllVertices())); - assertEquals(2, Iterables.size(fusedGraph.getAllEdges())); - assertEquals(1, Iterables.size(fusedGraph.getLeafVertices())); + assertEquals(1, Iterables.size(fusedGraph.getFusedSteps())); + assertEquals(3, Iterables.getOnlyElement(fusedGraph.getFusedSteps()).getSteps().size()); } }
