mr-runner: refactors and creates Graph data structures to handle general Beam pipelines.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/16e63205 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/16e63205 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/16e63205 Branch: refs/heads/mr-runner Commit: 16e63205bee3ade711ebffc3c74e18aec6d50c01 Parents: ee1cce9 Author: Pei He <p...@apache.org> Authored: Fri Jul 28 16:31:41 2017 +0800 Committer: Pei He <p...@apache.org> Committed: Thu Aug 31 14:13:48 2017 +0800 ---------------------------------------------------------------------- .../mapreduce/MapReducePipelineOptions.java | 17 + .../runners/mapreduce/MapReduceRegistrar.java | 17 + .../beam/runners/mapreduce/MapReduceRunner.java | 48 ++- .../runners/mapreduce/MapReduceWordCount.java | 218 ---------- .../beam/runners/mapreduce/package-info.java | 21 + .../mapreduce/translation/BeamInputFormat.java | 21 +- .../mapreduce/translation/BeamMapper.java | 19 +- .../mapreduce/translation/BeamReducer.java | 20 +- .../mapreduce/translation/FlattenOperation.java | 37 ++ .../translation/FlattenTranslator.java | 37 ++ .../runners/mapreduce/translation/Graph.java | 400 +++++++------------ .../mapreduce/translation/GraphConverter.java | 108 ++--- .../mapreduce/translation/GraphPlanner.java | 142 ++----- .../mapreduce/translation/GraphVisitor.java | 11 - .../runners/mapreduce/translation/Graphs.java | 188 +++++++++ .../GroupAlsoByWindowsParDoOperation.java | 24 +- .../GroupAlsoByWindowsViaOutputBufferDoFn.java | 17 + .../translation/GroupByKeyOperation.java | 54 +++ .../translation/GroupByKeyTranslator.java | 46 +++ .../mapreduce/translation/JobPrototype.java | 257 +++++------- .../translation/NormalParDoOperation.java | 12 +- .../mapreduce/translation/Operation.java | 30 +- .../mapreduce/translation/OutputReceiver.java | 3 +- .../mapreduce/translation/ParDoOperation.java | 14 +- .../mapreduce/translation/ParDoTranslator.java | 46 +++ .../translation/ReadBoundedTranslator.java | 37 ++ .../mapreduce/translation/ReadOperation.java | 45 +++ .../ReifyTimestampAndWindowsParDoOperation.java | 24 +- .../translation/TransformTranslator.java | 48 +++ .../translation/TranslationContext.java | 128 ++++++ .../translation/TranslatorRegistry.java | 58 +++ .../mapreduce/translation/ViewOperation.java | 59 +++ .../mapreduce/translation/ViewTranslator.java | 42 ++ .../translation/WindowAssignOperation.java | 35 +- .../translation/WindowAssignTranslator.java | 38 ++ .../mapreduce/translation/WriteOperation.java | 33 +- .../beam/runners/mapreduce/WordCountTest.java | 25 +- .../translation/GraphConverterTest.java | 33 +- .../mapreduce/translation/GraphPlannerTest.java | 37 +- 39 files changed, 1568 insertions(+), 881 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/16e63205/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReducePipelineOptions.java ---------------------------------------------------------------------- diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReducePipelineOptions.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReducePipelineOptions.java index 7fe66ba..73c7d47 100644 --- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReducePipelineOptions.java +++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReducePipelineOptions.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.beam.runners.mapreduce; import com.google.common.collect.ImmutableSet; http://git-wip-us.apache.org/repos/asf/beam/blob/16e63205/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReduceRegistrar.java ---------------------------------------------------------------------- diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReduceRegistrar.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReduceRegistrar.java index eb960b8..c8b0eea 100644 --- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReduceRegistrar.java +++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReduceRegistrar.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.beam.runners.mapreduce; import com.google.auto.service.AutoService; http://git-wip-us.apache.org/repos/asf/beam/blob/16e63205/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReduceRunner.java ---------------------------------------------------------------------- diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReduceRunner.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReduceRunner.java index 11ac9a7..b6a82d1 100644 --- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReduceRunner.java +++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReduceRunner.java @@ -1,23 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.beam.runners.mapreduce; import static com.google.common.base.Preconditions.checkNotNull; import com.google.common.base.Throwables; -import org.apache.beam.runners.mapreduce.translation.Graph; +import org.apache.beam.runners.mapreduce.translation.Graphs; import org.apache.beam.runners.mapreduce.translation.GraphConverter; import org.apache.beam.runners.mapreduce.translation.GraphPlanner; import org.apache.beam.runners.mapreduce.translation.JobPrototype; +import org.apache.beam.runners.mapreduce.translation.TranslationContext; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.PipelineResult; import org.apache.beam.sdk.PipelineRunner; -import org.apache.beam.sdk.io.Read; import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.transforms.GroupByKey; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapreduce.Job; /** - * {@link PipelineRunner} for crunch. + * {@link PipelineRunner} for MapReduce. */ public class MapReduceRunner extends PipelineRunner<PipelineResult> { @@ -39,22 +55,20 @@ public class MapReduceRunner extends PipelineRunner<PipelineResult> { @Override public PipelineResult run(Pipeline pipeline) { - GraphConverter graphConverter = new GraphConverter(); + TranslationContext context = new TranslationContext(options); + GraphConverter graphConverter = new GraphConverter(context); pipeline.traverseTopologically(graphConverter); - Graph graph = graphConverter.getGraph(); - GraphPlanner planner = new GraphPlanner(); - Graph fusedGraph = planner.plan(graph); - for (Graph.Vertex vertex : fusedGraph.getAllVertices()) { - if (vertex.getStep().getTransform() instanceof GroupByKey) { - JobPrototype jobPrototype = JobPrototype.create(1, vertex); - try { - Job job = jobPrototype.build(options.getJarClass(), new Configuration()); - job.waitForCompletion(true); - } catch (Exception e) { - Throwables.throwIfUnchecked(e); - } + Graphs.FusedGraph fusedGraph = planner.plan(context.getInitGraph()); + int stageId = 0; + for (Graphs.FusedStep fusedStep : fusedGraph.getFusedSteps()) { + JobPrototype jobPrototype = JobPrototype.create(stageId++, fusedStep, options); + try { + Job job = jobPrototype.build(options.getJarClass(), new Configuration()); + job.waitForCompletion(true); + } catch (Exception e) { + Throwables.throwIfUnchecked(e); } } return null; http://git-wip-us.apache.org/repos/asf/beam/blob/16e63205/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReduceWordCount.java ---------------------------------------------------------------------- diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReduceWordCount.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReduceWordCount.java deleted file mode 100644 index d0c7b78..0000000 --- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReduceWordCount.java +++ /dev/null @@ -1,218 +0,0 @@ -package org.apache.beam.runners.mapreduce; - -import com.google.common.base.Optional; -import com.google.common.collect.ImmutableList; -import java.io.IOException; -import java.util.List; -import java.util.NoSuchElementException; -import java.util.StringTokenizer; - -import javax.annotation.Nullable; -import org.apache.beam.runners.mapreduce.translation.BeamInputFormat; -import org.apache.beam.runners.mapreduce.translation.BeamMapper; -import org.apache.beam.sdk.coders.BigEndianIntegerCoder; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.CoderException; -import org.apache.beam.sdk.coders.KvCoder; -import org.apache.beam.sdk.coders.StringUtf8Coder; -import org.apache.beam.sdk.io.BoundedSource; -import org.apache.beam.sdk.io.OffsetBasedSource; -import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.util.CoderUtils; -import org.apache.beam.sdk.util.SerializableUtils; -import org.apache.beam.sdk.values.KV; -import org.apache.commons.codec.binary.Base64; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.io.IntWritable; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.Mapper; -import org.apache.hadoop.mapreduce.Reducer; -import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; -import org.apache.log4j.BasicConfigurator; - -public class MapReduceWordCount { - - public static class CreateSource<T> extends OffsetBasedSource<T> { - private final List<byte[]> allElementsBytes; - private final long totalSize; - private final Coder<T> coder; - - public static <T> CreateSource<T> fromIterable(Iterable<T> elements, Coder<T> elemCoder) - throws CoderException, IOException { - ImmutableList.Builder<byte[]> allElementsBytes = ImmutableList.builder(); - long totalSize = 0L; - for (T element : elements) { - byte[] bytes = CoderUtils.encodeToByteArray(elemCoder, element); - allElementsBytes.add(bytes); - totalSize += bytes.length; - } - return new CreateSource<>(allElementsBytes.build(), totalSize, elemCoder); - } - - /** - * Create a new source with the specified bytes. The new source owns the input element bytes, - * which must not be modified after this constructor is called. - */ - private CreateSource(List<byte[]> elementBytes, long totalSize, Coder<T> coder) { - super(0, elementBytes.size(), 1); - this.allElementsBytes = ImmutableList.copyOf(elementBytes); - this.totalSize = totalSize; - this.coder = coder; - } - - @Override - public long getEstimatedSizeBytes(PipelineOptions options) throws Exception { - return totalSize; - } - - @Override - public BoundedSource.BoundedReader<T> createReader(PipelineOptions options) - throws IOException { - return new BytesReader<>(this); - } - - @Override - public void validate() {} - - @Override - public Coder<T> getDefaultOutputCoder() { - return coder; - } - - @Override - public long getMaxEndOffset(PipelineOptions options) throws Exception { - return allElementsBytes.size(); - } - - @Override - public OffsetBasedSource<T> createSourceForSubrange(long start, long end) { - List<byte[]> primaryElems = allElementsBytes.subList((int) start, (int) end); - long primarySizeEstimate = - (long) (totalSize * primaryElems.size() / (double) allElementsBytes.size()); - return new CreateSource<>(primaryElems, primarySizeEstimate, coder); - } - - @Override - public long getBytesPerOffset() { - if (allElementsBytes.size() == 0) { - return 1L; - } - return Math.max(1, totalSize / allElementsBytes.size()); - } - - private static class BytesReader<T> extends OffsetBasedReader<T> { - private int index; - /** - * Use an optional to distinguish between null next element (as Optional.absent()) and no next - * element (next is null). - */ - @Nullable - private Optional<T> next; - - public BytesReader(CreateSource<T> source) { - super(source); - index = -1; - } - - @Override - @Nullable - public T getCurrent() throws NoSuchElementException { - if (next == null) { - throw new NoSuchElementException(); - } - return next.orNull(); - } - - @Override - public void close() throws IOException {} - - @Override - protected long getCurrentOffset() { - return index; - } - - @Override - protected boolean startImpl() throws IOException { - return advanceImpl(); - } - - @Override - public synchronized CreateSource<T> getCurrentSource() { - return (CreateSource<T>) super.getCurrentSource(); - } - - @Override - protected boolean advanceImpl() throws IOException { - CreateSource<T> source = getCurrentSource(); - if (index + 1 >= source.allElementsBytes.size()) { - next = null; - return false; - } - index++; - next = - Optional.fromNullable( - CoderUtils.decodeFromByteArray(source.coder, source.allElementsBytes.get(index))); - return true; - } - } - } - - public static class TokenizerMapper - extends Mapper<Object, Text, Text, IntWritable>{ - - private final static IntWritable one = new IntWritable(1); - private Text word = new Text(); - - public void map(Object key, Text value, Context context - ) throws IOException, InterruptedException { - StringTokenizer itr = new StringTokenizer(value.toString()); - while (itr.hasMoreTokens()) { - word.set(itr.nextToken()); - context.write(word, one); - } - } - } - - public static class IntSumReducer - extends Reducer<Text, IntWritable, Text, IntWritable> { - private IntWritable result = new IntWritable(); - - public void reduce(Text key, Iterable<IntWritable> values, Context context) - throws IOException, InterruptedException { - int sum = 0; - for (IntWritable val : values) { - sum += val.get(); - } - result.set(sum); - context.write(key, result); - } - } - - public static void main(String[] args) throws Exception { - BasicConfigurator.configure(); - - Configuration conf = new Configuration(); - - BoundedSource<KV<String, Integer>> source = CreateSource.fromIterable( - ImmutableList.of(KV.of("k1", 10), KV.of("k2", 2)), - KvCoder.of(StringUtf8Coder.of(), BigEndianIntegerCoder.of())); - - conf.set( - BeamInputFormat.BEAM_SERIALIZED_BOUNDED_SOURCE, - Base64.encodeBase64String(SerializableUtils.serializeToByteArray(source))); - - Job job = Job.getInstance(conf, "word count"); - job.setJarByClass(MapReduceWordCount.class); - job.setInputFormatClass(BeamInputFormat.class); - job.setMapperClass(BeamMapper.class); - //job.setMapperClass(TokenizerMapper.class); - //job.setCombinerClass(IntSumReducer.class); - //job.setReducerClass(IntSumReducer.class); - //job.setOutputKeyClass(Text.class); - //job.setOutputValueClass(IntWritable.class); - //FileInputFormat.addInputPath(job, new Path(args[0])); - job.setOutputFormatClass(NullOutputFormat.class); - System.exit(job.waitForCompletion(true) ? 0 : 1); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/16e63205/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/package-info.java ---------------------------------------------------------------------- diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/package-info.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/package-info.java new file mode 100644 index 0000000..d511405 --- /dev/null +++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/package-info.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * MapReduce runner implementation. + */ +package org.apache.beam.runners.mapreduce; http://git-wip-us.apache.org/repos/asf/beam/blob/16e63205/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamInputFormat.java ---------------------------------------------------------------------- diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamInputFormat.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamInputFormat.java index 0cfb14b..8a27a85 100644 --- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamInputFormat.java +++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamInputFormat.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.beam.runners.mapreduce.translation; import static com.google.common.base.Preconditions.checkNotNull; @@ -14,8 +31,6 @@ import org.apache.beam.sdk.io.BoundedSource; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.util.SerializableUtils; import org.apache.beam.sdk.util.WindowedValue; -import org.apache.beam.sdk.values.KV; -import org.apache.beam.sdk.values.TimestampedValue; import org.apache.commons.codec.binary.Base64; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapreduce.InputFormat; @@ -66,6 +81,7 @@ public class BeamInputFormat<T> extends InputFormat { @Override public RecordReader createRecordReader( InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { + // TODO: it should initiates from InputSplit. source = (BoundedSource<T>) SerializableUtils.deserializeFromByteArray( Base64.decodeBase64(context.getConfiguration().get(BEAM_SERIALIZED_BOUNDED_SOURCE)), ""); @@ -121,6 +137,7 @@ public class BeamInputFormat<T> extends InputFormat { @Override public boolean nextKeyValue() throws IOException, InterruptedException { if (!started) { + started = true; return reader.start(); } else { return reader.advance(); http://git-wip-us.apache.org/repos/asf/beam/blob/16e63205/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamMapper.java ---------------------------------------------------------------------- diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamMapper.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamMapper.java index b5e4edc..bc52967 100644 --- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamMapper.java +++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamMapper.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.beam.runners.mapreduce.translation; import static com.google.common.base.Preconditions.checkNotNull; @@ -9,7 +26,7 @@ import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.TaskInputOutputContext; /** - * Created by peihe on 21/07/2017. + * Adapter for executing Beam transforms in {@link Mapper}. */ public class BeamMapper<ValueInT, ValueOutT> extends Mapper<Object, WindowedValue<ValueInT>, Object, WindowedValue<ValueOutT>> { http://git-wip-us.apache.org/repos/asf/beam/blob/16e63205/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamReducer.java ---------------------------------------------------------------------- diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamReducer.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamReducer.java index 9b8bd82..3490b3b 100644 --- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamReducer.java +++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamReducer.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.beam.runners.mapreduce.translation; import static com.google.common.base.Preconditions.checkNotNull; @@ -19,8 +36,9 @@ import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.TaskInputOutputContext; + /** - * Created by peihe on 25/07/2017. + * Adapter for executing Beam transforms in {@link Reducer}. */ public class BeamReducer<ValueInT, ValueOutT> extends Reducer<BytesWritable, byte[], Object, WindowedValue<ValueOutT>> { http://git-wip-us.apache.org/repos/asf/beam/blob/16e63205/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FlattenOperation.java ---------------------------------------------------------------------- diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FlattenOperation.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FlattenOperation.java new file mode 100644 index 0000000..191b346 --- /dev/null +++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FlattenOperation.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.mapreduce.translation; + +import org.apache.beam.sdk.util.WindowedValue; + +/** + * Flatten operation. + */ +public class FlattenOperation<T> extends Operation<T> { + + public FlattenOperation() { + super(1); + } + + @Override + public void process(WindowedValue elem) { + for (OutputReceiver receiver : getOutputReceivers()) { + receiver.process(elem); + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/16e63205/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FlattenTranslator.java ---------------------------------------------------------------------- diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FlattenTranslator.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FlattenTranslator.java new file mode 100644 index 0000000..8860caf --- /dev/null +++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FlattenTranslator.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.mapreduce.translation; + +import org.apache.beam.sdk.transforms.Flatten; + +/** + * Translates a {@link Flatten} to a {@link FlattenOperation}. + */ +public class FlattenTranslator<T> extends TransformTranslator.Default<Flatten.PCollections<T>> { + @Override + public void translateNode(Flatten.PCollections<T> transform, TranslationContext context) { + TranslationContext.UserGraphContext userGraphContext = context.getUserGraphContext(); + + Operation<?> operation = new FlattenOperation(); + context.addInitStep(Graphs.Step.of( + userGraphContext.getStepName(), + operation, + userGraphContext.getInputTags(), + userGraphContext.getOutputTags())); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/16e63205/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Graph.java ---------------------------------------------------------------------- diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Graph.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Graph.java index e360419..b6900cc 100644 --- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Graph.java +++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Graph.java @@ -1,311 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.beam.runners.mapreduce.translation; -import static com.google.common.base.Preconditions.checkNotNull; -import static com.google.common.base.Preconditions.checkState; - -import com.google.auto.value.AutoValue; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Maps; -import com.google.common.collect.Sets; -import java.util.LinkedList; +import com.google.common.base.Function; +import com.google.common.base.Predicate; +import com.google.common.collect.FluentIterable; +import com.google.common.collect.Iterables; +import com.google.common.graph.ElementOrder; +import com.google.common.graph.GraphBuilder; +import com.google.common.graph.MutableGraph; import java.util.List; -import java.util.Map; import java.util.Objects; import java.util.Set; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.io.Read; -import org.apache.beam.sdk.transforms.Flatten; -import org.apache.beam.sdk.transforms.GroupByKey; -import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.transforms.windowing.Window; -import org.apache.beam.sdk.values.TupleTag; -import org.apache.beam.sdk.values.WindowingStrategy; -import org.apache.commons.lang.builder.ReflectionToStringBuilder; -import org.apache.commons.lang.builder.ToStringStyle; /** - * Created by peihe on 06/07/2017. + * Graph that represents a Beam DAG. */ -public class Graph { +public class Graph<StepT extends Graph.AbstractStep<TagT>, TagT extends Graph.AbstractTag> { - private final Map<Step, Vertex> vertices; - private final Map<HeadTail, Edge> edges; - private final Set<Vertex> leafVertices; + private final MutableGraph<Vertex> graph; public Graph() { - this.vertices = Maps.newHashMap(); - this.edges = Maps.newHashMap(); - this.leafVertices = Sets.newHashSet(); + this.graph = GraphBuilder.directed() + .allowsSelfLoops(false) + .nodeOrder(ElementOrder.insertion()) + .build(); } - public Vertex addVertex(Step step) { - checkState(!vertices.containsKey(step)); - Vertex v = new Vertex(step); - vertices.put(step, v); - leafVertices.add(v); - return v; + /** + * Adds {@link StepT} to this {@link Graph}. + */ + public void addStep(StepT step) { + graph.addNode(step); + Set<Vertex> nodes = graph.nodes(); + for (TagT tag : step.getInputTags()) { + if (!nodes.contains(tag)) { + graph.addNode(tag); + } + graph.putEdge(tag, step); + } + for (TagT tag : step.getOutputTags()) { + if (!nodes.contains(tag)) { + graph.addNode(tag); + } + graph.putEdge(step, tag); + } } - public Edge addEdge(Vertex head, Vertex tail, Coder<?> coder) { - HeadTail headTail = HeadTail.of(head, tail); - checkState(!edges.containsKey(headTail)); - Edge e = new Edge(headTail, coder); - edges.put(headTail, e); - head.addOutgoing(e); - tail.addIncoming(e); - leafVertices.remove(head); - return e; + public void removeStep(StepT step) { + graph.removeNode(step); } - public Vertex getVertex(Step step) { - return vertices.get(step); + public void removeTag(TagT tag) { + graph.removeNode(tag); } - public Edge getEdge(Vertex head, Vertex tail) { - return edges.get(HeadTail.of(head, tail)); + public void addEdge(TagT inTag, StepT step) { + graph.putEdge(inTag, step); } - public Iterable<Vertex> getAllVertices() { - return vertices.values(); + public void addEdge(StepT step, TagT outTag) { + graph.putEdge(step, outTag); } - public Iterable<Edge> getAllEdges() { - return edges.values(); + public void removeEdge(TagT inTag, StepT step) { + graph.removeEdge(inTag, step); } - public Iterable<Vertex> getLeafVertices() { - return ImmutableList.copyOf(leafVertices); + public void removeEdge(StepT step, TagT outTag) { + graph.removeEdge(step, outTag); } - public void accept(GraphVisitor visitor) { - for (Vertex v : leafVertices) { - v.accept(visitor); - } + public List<StepT> getSteps() { + return castToStepList(FluentIterable.from(graph.nodes()) + .filter(new Predicate<Vertex>() { + @Override + public boolean apply(Vertex input) { + return input instanceof AbstractStep; + }})) + .toList(); } - //TODO: add equals, hashCode, toString for following classses. - - public static class Vertex { - private final Step step; - private final Set<Edge> incoming; - private final Set<Edge> outgoing; - - public Vertex(Step step) { - this.step = checkNotNull(step, "step"); - this.incoming = Sets.newHashSet(); - this.outgoing = Sets.newHashSet(); - } - - public Step getStep() { - return step; - } - - public Set<Edge> getIncoming() { - return incoming; - } - - public Set<Edge> getOutgoing() { - return outgoing; - } - - public boolean isSource() { - PTransform<?, ?> transform = step.getTransform(); - return transform instanceof Read.Bounded || transform instanceof Read.Unbounded; - } - - public boolean isGroupByKey() { - return step.getTransform() instanceof GroupByKey; - } - - public void addIncoming(Edge edge) { - incoming.add(edge); - } - - public void addOutgoing(Edge edge) { - outgoing.add(edge); - } - - public void accept(GraphVisitor visitor) { - PTransform<?, ?> transform = step.getTransform(); - if (transform instanceof ParDo.SingleOutput || transform instanceof ParDo.MultiOutput - || transform instanceof Window.Assign) { - visitor.visitParDo(this); - } else if (transform instanceof GroupByKey) { - visitor.visitGroupByKey(this); - } else if (transform instanceof Read.Bounded) { - visitor.visitRead(this); - } else if (transform instanceof Flatten.PCollections - || transform instanceof Flatten.Iterables) { - visitor.visitFlatten(this); - } else { - throw new RuntimeException("Unexpected transform type: " + transform.getClass()); - } - } - - @Override - public boolean equals(Object obj) { - if (obj == this) { - return true; - } - if (obj instanceof Vertex) { - Vertex other = (Vertex) obj; - return step.equals(other.step); - } - return false; - } - - @Override - public int hashCode() { - return Objects.hash(this.getClass(), step); - } - - @Override - public String toString() { - return new ReflectionToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE) - .setExcludeFieldNames(new String[] { "outgoing", "incoming" }).toString(); - } + public List<StepT> getStartSteps() { + return castToStepList(FluentIterable.from(graph.nodes()) + .filter(new Predicate<Vertex>() { + @Override + public boolean apply(Vertex input) { + return input instanceof AbstractStep && graph.inDegree(input) == 0; + }})) + .toList(); } - public static class Edge { - private final HeadTail headTail; - private final Coder<?> coder; - private final Set<NodePath> paths; - - public static Edge of(HeadTail headTail, Coder<?> coder) { - return new Edge(headTail, coder); - } - - private Edge(HeadTail headTail, Coder<?> coder) { - this.headTail = checkNotNull(headTail, "headTail"); - this.coder = checkNotNull(coder, "coder"); - this.paths = Sets.newHashSet(); - } - - public Vertex getHead() { - return headTail.getHead(); - } - - public Vertex getTail() { - return headTail.getTail(); - } - - public Coder<?> getCoder() { - return coder; - } - - public Set<NodePath> getPaths() { - return paths; - } - - public void addPath(NodePath path) { - paths.add(checkNotNull(path, "path")); - } - - @Override - public boolean equals(Object obj) { - if (obj == this) { - return true; - } - if (obj instanceof Edge) { - Edge other = (Edge) obj; - return headTail.equals(other.headTail) - && paths.equals(other.paths) && coder.equals(other.coder); - } - return false; - } - - @Override - public int hashCode() { - return Objects.hash(headTail, paths, coder); - } - - @Override - public String toString() { - return ReflectionToStringBuilder.toString(this, ToStringStyle.SHORT_PREFIX_STYLE); - } + public List<TagT> getInputTags() { + return castToTagList(FluentIterable.from(graph.nodes()) + .filter(new Predicate<Vertex>() { + @Override + public boolean apply(Vertex input) { + return input instanceof AbstractTag && graph.inDegree(input) == 0; + }})) + .toList(); } - public static class NodePath { - private final LinkedList<Step> path; - - public NodePath() { - this.path = new LinkedList<>(); - } - - public NodePath(NodePath nodePath) { - this.path = new LinkedList<>(nodePath.path); - } + public List<TagT> getOutputTags() { + return castToTagList(FluentIterable.from(graph.nodes()) + .filter(new Predicate<Vertex>() { + @Override + public boolean apply(Vertex input) { + return input instanceof AbstractTag && graph.outDegree(input) == 0; + }})) + .toList(); + } - public void addFirst(Step step) { - path.addFirst(step); - } + public StepT getProducer(TagT tag) { + return (StepT) Iterables.getOnlyElement(graph.predecessors(tag)); + } - public void addLast(Step step) { - path.addLast(step); - } + public List<StepT> getConsumers(TagT tag) { + return castToStepList(graph.successors(tag)).toList(); + } - public Iterable<Step> steps() { - return ImmutableList.copyOf(path); - } + private FluentIterable<StepT> castToStepList(Iterable<Vertex> vertices) { + return FluentIterable.from(vertices) + .transform(new Function<Vertex, StepT>() { + @Override + public StepT apply(Vertex input) { + return (StepT) input; + }}); + } - @Override - public boolean equals(Object obj) { - if (obj == this) { - return true; - } - if (obj instanceof NodePath) { - NodePath other = (NodePath) obj; - return path.equals(other.path); - } - return false; - } + private FluentIterable<TagT> castToTagList(Iterable<Vertex> vertices) { + return FluentIterable.from(vertices) + .transform(new Function<Vertex, TagT>() { + @Override + public TagT apply(Vertex input) { + return (TagT) input; + }}); + } - @Override - public int hashCode() { - return Objects.hash(this.getClass(), path.hashCode()); + @Override + public boolean equals(Object obj) { + if (obj == this) { + return true; } - - @Override - public String toString() { - StringBuilder sb = new StringBuilder(); - for (Step step : path) { - sb.append(step.getFullName() + "|"); - } - if (path.size() > 0) { - sb.deleteCharAt(sb.length() - 1); - } - return sb.toString(); + if (obj instanceof Graph) { + Graph other = (Graph) obj; + return com.google.common.graph.Graphs.equivalent(this.graph, other.graph); } + return false; } - @AutoValue - public abstract static class Step { - abstract String getFullName(); - // TODO: remove public - public abstract PTransform<?, ?> getTransform(); - abstract WindowingStrategy<?, ?> getWindowingStrategy(); - abstract List<TupleTag<?>> getInputs(); - abstract List<TupleTag<?>> getOutputs(); + @Override + public int hashCode() { + return Objects.hash(this.getClass(), graph.nodes()); + } - public static Step of( - String fullName, - PTransform<?, ?> transform, - WindowingStrategy<?, ?> windowingStrategy, - List<TupleTag<?>> inputs, - List<TupleTag<?>> outputs) { - return new org.apache.beam.runners.mapreduce.translation.AutoValue_Graph_Step( - fullName, transform, windowingStrategy, inputs, outputs); - } + /** + * Vertex interface of this Graph. + */ + interface Vertex { } - @AutoValue - public abstract static class HeadTail { - abstract Vertex getHead(); - abstract Vertex getTail(); + public abstract static class AbstractStep<TagT extends AbstractTag> implements Vertex { + public abstract List<TagT> getInputTags(); + public abstract List<TagT> getOutputTags(); + } - public static HeadTail of(Vertex head, Vertex tail) { - return new org.apache.beam.runners.mapreduce.translation.AutoValue_Graph_HeadTail(head, tail); - } + public abstract static class AbstractTag implements Vertex { } } http://git-wip-us.apache.org/repos/asf/beam/blob/16e63205/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GraphConverter.java ---------------------------------------------------------------------- diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GraphConverter.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GraphConverter.java index e7e7598..1e818fa 100644 --- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GraphConverter.java +++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GraphConverter.java @@ -1,77 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.beam.runners.mapreduce.translation; -import com.google.common.collect.ImmutableList; +import static com.google.common.base.Preconditions.checkNotNull; import com.google.common.collect.Maps; import java.util.Map; import org.apache.beam.runners.mapreduce.MapReduceRunner; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.runners.TransformHierarchy; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PCollectionList; -import org.apache.beam.sdk.values.PCollectionTuple; -import org.apache.beam.sdk.values.PCollectionView; +import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.values.PValue; import org.apache.beam.sdk.values.TupleTag; -import org.apache.beam.sdk.values.WindowingStrategy; /** * Pipeline translator for {@link MapReduceRunner}. */ public class GraphConverter extends Pipeline.PipelineVisitor.Defaults { + private final TranslationContext context; private final Map<PValue, TupleTag<?>> pValueToTupleTag; - private final Map<TupleTag<?>, Graph.Vertex> outputToProducer; - private final Graph graph; - public GraphConverter() { + public GraphConverter(TranslationContext context) { + this.context = checkNotNull(context, "context"); this.pValueToTupleTag = Maps.newHashMap(); - this.outputToProducer = Maps.newHashMap(); - this.graph = new Graph(); } @Override - public void visitPrimitiveTransform(TransformHierarchy.Node node) { - WindowingStrategy<?, ?> windowingStrategy = - getWindowingStrategy(node.getOutputs().values().iterator().next()); - Graph.Step step = Graph.Step.of( - node.getFullName(), - node.getTransform(), - windowingStrategy, - ImmutableList.copyOf(node.getInputs().keySet()), - ImmutableList.copyOf(node.getOutputs().keySet())); - Graph.Vertex v = graph.addVertex(step); + public CompositeBehavior enterCompositeTransform(TransformHierarchy.Node node) { + // check if current composite transforms need to be translated. + // If not, all sub transforms will be translated in visitPrimitiveTransform. + PTransform<?, ?> transform = node.getTransform(); + if (transform != null) { + TransformTranslator translator = TranslatorRegistry.getTranslator(transform); - for (PValue pValue : node.getInputs().values()) { - TupleTag<?> tag = pValueToTupleTag.get(pValue); - if (outputToProducer.containsKey(tag)) { - Graph.Vertex producer = outputToProducer.get(tag); - - PCollection<?> pc = (PCollection<?>) pValue; - graph.addEdge(producer, v, pc.getCoder()); + if (translator != null && applyCanTranslate(transform, node, translator)) { + applyTransform(transform, node, translator); + return CompositeBehavior.DO_NOT_ENTER_TRANSFORM; } } + return CompositeBehavior.ENTER_TRANSFORM; + } - for (Map.Entry<TupleTag<?>, PValue> entry : node.getOutputs().entrySet()) { - pValueToTupleTag.put(entry.getValue(), entry.getKey()); - outputToProducer.put(entry.getKey(), v); + @Override + public void visitPrimitiveTransform(TransformHierarchy.Node node) { + if (!node.isRootNode()) { + PTransform<?, ?> transform = node.getTransform(); + TransformTranslator translator = TranslatorRegistry.getTranslator(transform); + if (translator == null || !applyCanTranslate(transform, node, translator)) { + throw new UnsupportedOperationException( + "The transform " + transform + " is currently not supported."); + } + applyTransform(transform, node, translator); } } - private WindowingStrategy<?, ?> getWindowingStrategy(PValue pValue) { - if (pValue instanceof PCollection) { - return ((PCollection) pValue).getWindowingStrategy(); - } else if (pValue instanceof PCollectionList) { - return ((PCollectionList) pValue).get(0).getWindowingStrategy(); - } else if (pValue instanceof PCollectionTuple) { - return ((PCollectionTuple) pValue).getAll().values().iterator().next().getWindowingStrategy(); - } else if (pValue instanceof PCollectionView) { - return ((PCollectionView) pValue).getPCollection().getWindowingStrategy(); - } else { - throw new RuntimeException("Unexpected pValue type: " + pValue.getClass()); - } + private <T extends PTransform<?, ?>> void applyTransform( + PTransform<?, ?> transform, + TransformHierarchy.Node node, + TransformTranslator<?> translator) { + @SuppressWarnings("unchecked") + T typedTransform = (T) transform; + @SuppressWarnings("unchecked") + TransformTranslator<T> typedTranslator = (TransformTranslator<T>) translator; + context.getUserGraphContext().setCurrentNode(node); + typedTranslator.translateNode(typedTransform, context); } - public Graph getGraph() { - return graph; + private <T extends PTransform<?, ?>> boolean applyCanTranslate( + PTransform<?, ?> transform, + TransformHierarchy.Node node, + TransformTranslator<?> translator) { + @SuppressWarnings("unchecked") + T typedTransform = (T) transform; + @SuppressWarnings("unchecked") + TransformTranslator<T> typedTranslator = (TransformTranslator<T>) translator; + context.getUserGraphContext().setCurrentNode(node); + return typedTranslator.canTranslate(typedTransform, context); } } http://git-wip-us.apache.org/repos/asf/beam/blob/16e63205/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GraphPlanner.java ---------------------------------------------------------------------- diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GraphPlanner.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GraphPlanner.java index 9ae8365..be694e4 100644 --- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GraphPlanner.java +++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GraphPlanner.java @@ -1,117 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.beam.runners.mapreduce.translation; -import static com.google.common.base.Preconditions.checkArgument; - -import org.apache.beam.sdk.coders.Coder; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; /** - * Created by peihe on 06/07/2017. + * Class that optimizes the initial graph to a fused graph. */ public class GraphPlanner { - public Graph plan(Graph initGraph) { - FusionVisitor fusionVisitor = new FusionVisitor(); - initGraph.accept(fusionVisitor); - return fusionVisitor.getFusedGraph(); - } - - private class FusionVisitor implements GraphVisitor { - private Graph fusedGraph; - private Graph.Vertex workingVertex; - private Graph.NodePath workingPath; - private Coder<?> workingEdgeCoder; - - FusionVisitor() { - fusedGraph = new Graph(); - workingVertex = null; - workingPath = null; - } - - @Override - public void visitRead(Graph.Vertex read) { - if (workingVertex == null) { - // drop if read is leaf vertex. - return; - } - Graph.Vertex v = fusedGraph.addVertex(read.getStep()); - workingPath.addFirst(read.getStep()); - Graph.Edge edge = fusedGraph.addEdge(v, workingVertex, workingEdgeCoder); - edge.addPath(workingPath); - } - - @Override - public void visitParDo(Graph.Vertex parDo) { - Graph.Step step = parDo.getStep(); - checkArgument( - step.getTransform().getAdditionalInputs().isEmpty(), - "Side inputs are not " + "supported."); - checkArgument( - parDo.getIncoming().size() == 1, - "Side inputs are not supported."); - Graph.Edge inEdge = parDo.getIncoming().iterator().next(); + public GraphPlanner() { + } - if (workingVertex == null) { - // Leaf vertex - workingVertex = fusedGraph.addVertex(step); - workingPath = new Graph.NodePath(); - workingEdgeCoder = inEdge.getCoder(); - } else { - workingPath.addFirst(step); - } - processParent(inEdge.getHead()); - } + public Graphs.FusedGraph plan(Graph<Graphs.Step, Graphs.Tag> initGraph) { + Graphs.FusedGraph fusedGraph = new Graphs.FusedGraph(); + // Convert from the list of steps to Graphs. + for (Graphs.Step step : Lists.reverse(initGraph.getSteps())) { + Graphs.FusedStep fusedStep = new Graphs.FusedStep(); + fusedStep.addStep(step); + fusedGraph.addFusedStep(fusedStep); - @Override - public void visitFlatten(Graph.Vertex flatten) { - if (workingVertex == null) { - return; - } - Graph.NodePath basePath = workingPath; - Graph.Vertex baseVertex = workingVertex; - for (Graph.Edge e : flatten.getIncoming()) { - workingPath = new Graph.NodePath(basePath); - workingVertex = baseVertex; - workingEdgeCoder = e.getCoder(); - processParent(e.getHead()); - } + tryFuse(fusedGraph, fusedStep); } + return fusedGraph; + } - @Override - public void visitGroupByKey(Graph.Vertex groupByKey) { - if (workingVertex == null) { - return; - } - Graph.Step step = groupByKey.getStep(); - Graph.Vertex addedGroupByKey = fusedGraph.addVertex(step); - - Graph.Edge edge = fusedGraph.addEdge( - addedGroupByKey, - workingVertex, - workingEdgeCoder); - edge.addPath(workingPath); - Graph.Edge inEdge = groupByKey.getIncoming().iterator().next(); - workingVertex = addedGroupByKey; - workingPath = new Graph.NodePath(); - workingEdgeCoder = inEdge.getCoder(); - processParent(inEdge.getHead()); + private void tryFuse(Graphs.FusedGraph fusedGraph, Graphs.FusedStep fusedStep) { + if (fusedStep.getOutputTags().size() != 1) { + return; } - - public Graph getFusedGraph() { - return fusedGraph; + Graphs.Tag outTag = Iterables.getOnlyElement(fusedStep.getOutputTags()); + if (fusedGraph.getConsumers(outTag).size() != 1) { + return; } - - private void processParent(Graph.Vertex parent) { - Graph.Step step = parent.getStep(); - Graph.Vertex v = fusedGraph.getVertex(step); - if (v == null) { - parent.accept(this); - } else { - // TODO: parent is consumed more than once. - // It is duplicated in multiple outgoing path. Figure out the impact. - workingPath.addFirst(step); - fusedGraph.getEdge(v, workingVertex).addPath(workingPath); - } + Graphs.FusedStep consumer = Iterables.getOnlyElement(fusedGraph.getConsumers(outTag)); + if (fusedStep.containsGroupByKey() && consumer.containsGroupByKey()) { + return; } + fusedGraph.merge(fusedStep, consumer); } } http://git-wip-us.apache.org/repos/asf/beam/blob/16e63205/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GraphVisitor.java ---------------------------------------------------------------------- diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GraphVisitor.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GraphVisitor.java deleted file mode 100644 index fe4a76f..0000000 --- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GraphVisitor.java +++ /dev/null @@ -1,11 +0,0 @@ -package org.apache.beam.runners.mapreduce.translation; - -/** - * Created by peihe on 06/07/2017. - */ -public interface GraphVisitor { - void visitRead(Graph.Vertex read); - void visitParDo(Graph.Vertex parDo); - void visitFlatten(Graph.Vertex flatten); - void visitGroupByKey(Graph.Vertex groupByKey); -} http://git-wip-us.apache.org/repos/asf/beam/blob/16e63205/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Graphs.java ---------------------------------------------------------------------- diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Graphs.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Graphs.java new file mode 100644 index 0000000..029d425 --- /dev/null +++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Graphs.java @@ -0,0 +1,188 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.mapreduce.translation; + +import com.google.auto.value.AutoValue; +import java.util.List; +import javax.annotation.Nullable; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.values.TupleTag; + +/** + * Class that defines graph vertices. + */ +public class Graphs { + + private Graphs() {} + + public static class FusedGraph { + private final Graph<FusedStep, Tag> graph; + + public FusedGraph() { + this.graph = new Graph<>(); + } + + public void addFusedStep(FusedStep fusedStep) { + graph.addStep(fusedStep); + } + + public void merge(FusedStep src, FusedStep dest) { + for (Step step : src.steps.getSteps()) { + dest.addStep(step); + } + for (Tag inTag : src.getInputTags()) { + graph.addEdge(inTag, dest); + } + for (Tag outTag : src.getOutputTags()) { + graph.addEdge(dest, outTag); + } + graph.removeStep(src); + } + + public FusedStep getProducer(Tag tag) { + return graph.getProducer(tag); + } + + public List<FusedStep> getConsumers(Tag tag) { + return graph.getConsumers(tag); + } + + public List<FusedStep> getFusedSteps() { + return graph.getSteps(); + } + } + + public static class FusedStep extends Graph.AbstractStep<Tag> { + private final Graph<Step, Tag> steps; + private Step groupByKeyStep; + + public FusedStep() { + this.steps = new Graph<>(); + this.groupByKeyStep = null; + } + + @Override + public List<Tag> getInputTags() { + return steps.getInputTags(); + } + + @Override + public List<Tag> getOutputTags() { + return steps.getOutputTags(); + } + + public void addStep(Step step) { + steps.addStep(step); + if (step.getOperation() instanceof GroupByKeyOperation) { + groupByKeyStep = step; + } + } + + public void removeStep(Step step) { + steps.removeStep(step); + } + + public void removeTag(Tag tag) { + steps.removeTag(tag); + } + + public void addEdge(Tag inTag, Step step) { + steps.addEdge(inTag, step); + } + + public void addEdge(Step step, Tag outTag) { + steps.addEdge(step, outTag); + } + + public void removeEdge(Tag inTag, Step step) { + steps.removeEdge(inTag, step); + } + + public void removeEdge(Step step, Tag outTag) { + steps.removeEdge(step, outTag); + } + + public Step getProducer(Tag tag) { + return steps.getProducer(tag); + } + + public List<Step> getConsumers(Tag tag) { + return steps.getConsumers(tag); + } + + public List<Step> getSteps() { + return steps.getSteps(); + } + + public List<Step> getStartSteps() { + return steps.getStartSteps(); + } + + public boolean containsGroupByKey() { + return groupByKeyStep != null; + } + + @Nullable + public Step getGroupByKeyStep() { + return groupByKeyStep; + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + for (Step step : steps.getSteps()) { + sb.append(step.getFullName() + "|"); + } + if (sb.length() > 0) { + sb.deleteCharAt(sb.length() - 1); + } + return sb.toString(); + } + + public String getFullName() { + return toString(); + } + } + + @AutoValue + public abstract static class Step extends Graph.AbstractStep<Tag> { + abstract String getFullName(); + // TODO: remove public + public abstract Operation getOperation(); + + public static Step of( + String fullName, + Operation operation, + List<Tag> inputTags, + List<Tag> outputTags) { + return new org.apache.beam.runners.mapreduce.translation.AutoValue_Graphs_Step( + inputTags, outputTags, fullName, operation); + } + } + + @AutoValue + public abstract static class Tag extends Graph.AbstractTag { + abstract TupleTag<?> getTupleTag(); + abstract Coder<?> getCoder(); + + public static Tag of(TupleTag<?> tupleTag, Coder<?> coder) { + return new org.apache.beam.runners.mapreduce.translation.AutoValue_Graphs_Tag( + tupleTag, coder); + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/16e63205/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GroupAlsoByWindowsParDoOperation.java ---------------------------------------------------------------------- diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GroupAlsoByWindowsParDoOperation.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GroupAlsoByWindowsParDoOperation.java index 1da8d26..66cf3b6 100644 --- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GroupAlsoByWindowsParDoOperation.java +++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GroupAlsoByWindowsParDoOperation.java @@ -1,7 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.beam.runners.mapreduce.translation; import static com.google.common.base.Preconditions.checkNotNull; +import com.google.common.collect.ImmutableList; import java.util.List; import org.apache.beam.runners.core.SystemReduceFn; import org.apache.beam.sdk.coders.Coder; @@ -11,7 +29,7 @@ import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.WindowingStrategy; /** - * Created by peihe on 26/07/2017. + * {@link Operation} that executes a {@link GroupAlsoByWindowsViaOutputBufferDoFn}. */ public class GroupAlsoByWindowsParDoOperation extends ParDoOperation { @@ -19,11 +37,9 @@ public class GroupAlsoByWindowsParDoOperation extends ParDoOperation { public GroupAlsoByWindowsParDoOperation( PipelineOptions options, - TupleTag<Object> mainOutputTag, - List<TupleTag<?>> sideOutputTags, WindowingStrategy<?, ?> windowingStrategy, Coder<?> inputCoder) { - super(options, mainOutputTag, sideOutputTags, windowingStrategy); + super(options, new TupleTag<>(), ImmutableList.<TupleTag<?>>of(), windowingStrategy); this.inputCoder = checkNotNull(inputCoder, "inputCoder"); } http://git-wip-us.apache.org/repos/asf/beam/blob/16e63205/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GroupAlsoByWindowsViaOutputBufferDoFn.java ---------------------------------------------------------------------- diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GroupAlsoByWindowsViaOutputBufferDoFn.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GroupAlsoByWindowsViaOutputBufferDoFn.java index 8ee616d..5ac23a5 100644 --- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GroupAlsoByWindowsViaOutputBufferDoFn.java +++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GroupAlsoByWindowsViaOutputBufferDoFn.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.beam.runners.mapreduce.translation; import static com.google.common.base.Preconditions.checkNotNull; http://git-wip-us.apache.org/repos/asf/beam/blob/16e63205/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GroupByKeyOperation.java ---------------------------------------------------------------------- diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GroupByKeyOperation.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GroupByKeyOperation.java new file mode 100644 index 0000000..b0be494 --- /dev/null +++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GroupByKeyOperation.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.mapreduce.translation; + +import static com.google.common.base.Preconditions.checkNotNull; + +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.WindowingStrategy; + +/** + * A GroupByKey place holder {@link Operation} during pipeline translation. + */ +public class GroupByKeyOperation<K, V> extends Operation<KV<K, V>> { + + private final WindowingStrategy<?, ?> windowingStrategy; + private final KvCoder<K, V> kvCoder; + + public GroupByKeyOperation(WindowingStrategy<?, ?> windowingStrategy, KvCoder<K, V> kvCoder) { + super(1); + this.windowingStrategy = checkNotNull(windowingStrategy, "windowingStrategy"); + this.kvCoder = checkNotNull(kvCoder, "kvCoder"); + } + + @Override + public void process(WindowedValue elem) { + throw new IllegalStateException( + String.format("%s should not in execution graph.", this.getClass().getSimpleName())); + } + + public WindowingStrategy<?, ?> getWindowingStrategy() { + return windowingStrategy; + } + + public KvCoder<K, V> getKvCoder() { + return kvCoder; + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/16e63205/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GroupByKeyTranslator.java ---------------------------------------------------------------------- diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GroupByKeyTranslator.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GroupByKeyTranslator.java new file mode 100644 index 0000000..e87ed09 --- /dev/null +++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GroupByKeyTranslator.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.mapreduce.translation; + +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.transforms.GroupByKey; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.WindowingStrategy; + +/** + * Translates a {@link GroupByKey} to {@link Operation Operations}. + */ +class GroupByKeyTranslator<K, V> extends TransformTranslator.Default<GroupByKey<K, V>> { + @Override + public void translateNode(GroupByKey<K, V> transform, TranslationContext context) { + TranslationContext.UserGraphContext userGraphContext = context.getUserGraphContext(); + + PCollection<?> inPCollection = (PCollection<?>) userGraphContext.getInput(); + WindowingStrategy<?, ?> windowingStrategy = inPCollection.getWindowingStrategy(); + Coder<?> inCoder = inPCollection.getCoder(); + + GroupByKeyOperation<K, V> groupByKeyOperation = + new GroupByKeyOperation<>(windowingStrategy, (KvCoder<K, V>) inCoder); + context.addInitStep(Graphs.Step.of( + userGraphContext.getStepName(), + groupByKeyOperation, + userGraphContext.getInputTags(), + userGraphContext.getOutputTags())); + } +}