http://git-wip-us.apache.org/repos/asf/beam/blob/e0e7e52b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java new file mode 100644 index 0000000..eaab3d1 --- /dev/null +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java @@ -0,0 +1,1043 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.flink; + +import static com.google.common.base.Preconditions.checkArgument; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.beam.runners.core.SystemReduceFn; +import org.apache.beam.runners.flink.translation.functions.FlinkAssignWindows; +import org.apache.beam.runners.flink.translation.types.CoderTypeInformation; +import org.apache.beam.runners.flink.translation.types.FlinkCoder; +import org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator; +import org.apache.beam.runners.flink.translation.wrappers.streaming.KvToByteBufferKeySelector; +import org.apache.beam.runners.flink.translation.wrappers.streaming.SingletonKeyedWorkItem; +import org.apache.beam.runners.flink.translation.wrappers.streaming.SingletonKeyedWorkItemCoder; +import org.apache.beam.runners.flink.translation.wrappers.streaming.WindowDoFnOperator; +import org.apache.beam.runners.flink.translation.wrappers.streaming.WorkItemKeySelector; +import org.apache.beam.runners.flink.translation.wrappers.streaming.io.BoundedSourceWrapper; +import org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedFlinkSink; +import org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedFlinkSource; +import org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.coders.VoidCoder; +import org.apache.beam.sdk.io.Read; +import org.apache.beam.sdk.io.Sink; +import org.apache.beam.sdk.io.TextIO; +import org.apache.beam.sdk.io.Write; +import org.apache.beam.sdk.transforms.Combine; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.Flatten; +import org.apache.beam.sdk.transforms.GroupByKey; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.join.RawUnionValue; +import org.apache.beam.sdk.transforms.join.UnionCoder; +import org.apache.beam.sdk.transforms.reflect.DoFnSignature; +import org.apache.beam.sdk.transforms.reflect.DoFnSignatures; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.GlobalWindow; +import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.transforms.windowing.Window; +import org.apache.beam.sdk.transforms.windowing.WindowFn; +import org.apache.beam.sdk.util.AppliedCombineFn; +import org.apache.beam.sdk.util.Reshuffle; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.util.WindowingStrategy; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionView; +import org.apache.beam.sdk.values.PValue; +import org.apache.beam.sdk.values.TaggedPValue; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.functions.RichFlatMapFunction; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.streaming.api.collector.selector.OutputSelector; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.DataStreamSink; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.datastream.KeyedStream; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import org.apache.flink.streaming.api.datastream.SplitStream; +import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; +import org.apache.flink.streaming.api.transformations.TwoInputTransformation; +import org.apache.flink.util.Collector; +import org.joda.time.Instant; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This class contains all the mappings between Beam and Flink + * <b>streaming</b> transformations. The {@link FlinkStreamingPipelineTranslator} + * traverses the Beam job and comes here to translate the encountered Beam transformations + * into Flink one, based on the mapping available in this class. + */ +class FlinkStreamingTransformTranslators { + + // -------------------------------------------------------------------------------------------- + // Transform Translator Registry + // -------------------------------------------------------------------------------------------- + + @SuppressWarnings("rawtypes") + private static final Map< + Class<? extends PTransform>, + FlinkStreamingPipelineTranslator.StreamTransformTranslator> TRANSLATORS = new HashMap<>(); + + // here you can find all the available translators. + static { + TRANSLATORS.put(Read.Bounded.class, new BoundedReadSourceTranslator()); + TRANSLATORS.put(Read.Unbounded.class, new UnboundedReadSourceTranslator()); + TRANSLATORS.put(Write.Bound.class, new WriteSinkStreamingTranslator()); + TRANSLATORS.put(TextIO.Write.Bound.class, new TextIOWriteBoundStreamingTranslator()); + + TRANSLATORS.put(ParDo.Bound.class, new ParDoBoundStreamingTranslator()); + TRANSLATORS.put(ParDo.BoundMulti.class, new ParDoBoundMultiStreamingTranslator()); + + TRANSLATORS.put(Window.Bound.class, new WindowBoundTranslator()); + TRANSLATORS.put(Flatten.FlattenPCollectionList.class, new FlattenPCollectionTranslator()); + TRANSLATORS.put( + FlinkStreamingViewOverrides.CreateFlinkPCollectionView.class, + new CreateViewStreamingTranslator()); + + TRANSLATORS.put(Reshuffle.class, new ReshuffleTranslatorStreaming()); + TRANSLATORS.put(GroupByKey.class, new GroupByKeyTranslator()); + TRANSLATORS.put(Combine.PerKey.class, new CombinePerKeyTranslator()); + } + + public static FlinkStreamingPipelineTranslator.StreamTransformTranslator<?> getTranslator( + PTransform<?, ?> transform) { + return TRANSLATORS.get(transform.getClass()); + } + + // -------------------------------------------------------------------------------------------- + // Transformation Implementations + // -------------------------------------------------------------------------------------------- + + private static class TextIOWriteBoundStreamingTranslator + extends FlinkStreamingPipelineTranslator.StreamTransformTranslator<TextIO.Write.Bound> { + + private static final Logger LOG = + LoggerFactory.getLogger(TextIOWriteBoundStreamingTranslator.class); + + @Override + public void translateNode( + TextIO.Write.Bound transform, + FlinkStreamingTranslationContext context) { + PValue input = context.getInput(transform); + DataStream<WindowedValue<String>> inputDataStream = context.getInputDataStream(input); + + String filenamePrefix = transform.getFilenamePrefix(); + String filenameSuffix = transform.getFilenameSuffix(); + boolean needsValidation = transform.needsValidation(); + int numShards = transform.getNumShards(); + String shardNameTemplate = transform.getShardNameTemplate(); + + // TODO: Implement these. We need Flink support for this. + LOG.warn( + "Translation of TextIO.Write.needsValidation not yet supported. Is: {}.", + needsValidation); + LOG.warn( + "Translation of TextIO.Write.filenameSuffix not yet supported. Is: {}.", + filenameSuffix); + LOG.warn( + "Translation of TextIO.Write.shardNameTemplate not yet supported. Is: {}.", + shardNameTemplate); + + DataStream<String> dataSink = inputDataStream + .flatMap(new FlatMapFunction<WindowedValue<String>, String>() { + @Override + public void flatMap( + WindowedValue<String> value, + Collector<String> out) + throws Exception { + out.collect(value.getValue()); + } + }); + DataStreamSink<String> output = + dataSink.writeAsText(filenamePrefix, FileSystem.WriteMode.OVERWRITE); + + if (numShards > 0) { + output.setParallelism(numShards); + } + } + } + + private static class WriteSinkStreamingTranslator<T> + extends FlinkStreamingPipelineTranslator.StreamTransformTranslator<Write.Bound<T>> { + + @Override + public void translateNode(Write.Bound<T> transform, FlinkStreamingTranslationContext context) { + String name = transform.getName(); + PValue input = context.getInput(transform); + + Sink<T> sink = transform.getSink(); + if (!(sink instanceof UnboundedFlinkSink)) { + throw new UnsupportedOperationException( + "At the time, only unbounded Flink sinks are supported."); + } + + DataStream<WindowedValue<T>> inputDataSet = context.getInputDataStream(input); + + inputDataSet.flatMap(new FlatMapFunction<WindowedValue<T>, Object>() { + @Override + public void flatMap(WindowedValue<T> value, Collector<Object> out) throws Exception { + out.collect(value.getValue()); + } + }).addSink(((UnboundedFlinkSink<Object>) sink).getFlinkSource()).name(name); + } + } + + private static class UnboundedReadSourceTranslator<T> + extends FlinkStreamingPipelineTranslator.StreamTransformTranslator<Read.Unbounded<T>> { + + @Override + public void translateNode( + Read.Unbounded<T> transform, + FlinkStreamingTranslationContext context) { + PCollection<T> output = context.getOutput(transform); + + TypeInformation<WindowedValue<T>> outputTypeInfo = + context.getTypeInfo(context.getOutput(transform)); + + DataStream<WindowedValue<T>> source; + if (transform.getSource().getClass().equals(UnboundedFlinkSource.class)) { + @SuppressWarnings("unchecked") + UnboundedFlinkSource<T> flinkSourceFunction = + (UnboundedFlinkSource<T>) transform.getSource(); + + final AssignerWithPeriodicWatermarks<T> flinkAssigner = + flinkSourceFunction.getFlinkTimestampAssigner(); + + DataStream<T> flinkSource = context.getExecutionEnvironment() + .addSource(flinkSourceFunction.getFlinkSource()); + + flinkSourceFunction.setCoder( + new FlinkCoder<T>(flinkSource.getType(), + context.getExecutionEnvironment().getConfig())); + + source = flinkSource + .assignTimestampsAndWatermarks(flinkAssigner) + .flatMap(new FlatMapFunction<T, WindowedValue<T>>() { + @Override + public void flatMap(T s, Collector<WindowedValue<T>> collector) throws Exception { + collector.collect( + WindowedValue.of( + s, + new Instant(flinkAssigner.extractTimestamp(s, -1)), + GlobalWindow.INSTANCE, + PaneInfo.NO_FIRING)); + }}).returns(outputTypeInfo); + } else { + try { + UnboundedSourceWrapper<T, ?> sourceWrapper = + new UnboundedSourceWrapper<>( + context.getPipelineOptions(), + transform.getSource(), + context.getExecutionEnvironment().getParallelism()); + source = context + .getExecutionEnvironment() + .addSource(sourceWrapper).name(transform.getName()).returns(outputTypeInfo); + } catch (Exception e) { + throw new RuntimeException( + "Error while translating UnboundedSource: " + transform.getSource(), e); + } + } + + context.setOutputDataStream(output, source); + } + } + + private static class BoundedReadSourceTranslator<T> + extends FlinkStreamingPipelineTranslator.StreamTransformTranslator<Read.Bounded<T>> { + + @Override + public void translateNode( + Read.Bounded<T> transform, + FlinkStreamingTranslationContext context) { + PCollection<T> output = context.getOutput(transform); + + TypeInformation<WindowedValue<T>> outputTypeInfo = + context.getTypeInfo(context.getOutput(transform)); + + + DataStream<WindowedValue<T>> source; + try { + BoundedSourceWrapper<T> sourceWrapper = + new BoundedSourceWrapper<>( + context.getPipelineOptions(), + transform.getSource(), + context.getExecutionEnvironment().getParallelism()); + source = context + .getExecutionEnvironment() + .addSource(sourceWrapper).name(transform.getName()).returns(outputTypeInfo); + } catch (Exception e) { + throw new RuntimeException( + "Error while translating BoundedSource: " + transform.getSource(), e); + } + + context.setOutputDataStream(output, source); + } + } + + private static void rejectSplittable(DoFn<?, ?> doFn) { + DoFnSignature signature = DoFnSignatures.getSignature(doFn.getClass()); + if (signature.processElement().isSplittable()) { + throw new UnsupportedOperationException( + String.format( + "%s does not currently support splittable DoFn: %s", + FlinkRunner.class.getSimpleName(), doFn)); + } + } + + private static class ParDoBoundStreamingTranslator<InputT, OutputT> + extends FlinkStreamingPipelineTranslator.StreamTransformTranslator< + ParDo.Bound<InputT, OutputT>> { + + @Override + public void translateNode( + ParDo.Bound<InputT, OutputT> transform, + FlinkStreamingTranslationContext context) { + + DoFn<InputT, OutputT> doFn = transform.getFn(); + rejectSplittable(doFn); + + WindowingStrategy<?, ?> windowingStrategy = + context.getOutput(transform).getWindowingStrategy(); + + TypeInformation<WindowedValue<OutputT>> typeInfo = + context.getTypeInfo(context.getOutput(transform)); + + List<PCollectionView<?>> sideInputs = transform.getSideInputs(); + + @SuppressWarnings("unchecked") + PCollection<InputT> inputPCollection = (PCollection<InputT>) context.getInput(transform); + + Coder<WindowedValue<InputT>> inputCoder = context.getCoder(inputPCollection); + + DataStream<WindowedValue<InputT>> inputDataStream = + context.getInputDataStream(context.getInput(transform)); + Coder keyCoder = null; + boolean stateful = false; + DoFnSignature signature = DoFnSignatures.getSignature(transform.getFn().getClass()); + if (signature.stateDeclarations().size() > 0 + || signature.timerDeclarations().size() > 0) { + // Based on the fact that the signature is stateful, DoFnSignatures ensures + // that it is also keyed + keyCoder = ((KvCoder) inputPCollection.getCoder()).getKeyCoder(); + inputDataStream = inputDataStream.keyBy(new KvToByteBufferKeySelector(keyCoder)); + stateful = true; + } + + if (sideInputs.isEmpty()) { + DoFnOperator<InputT, OutputT, WindowedValue<OutputT>> doFnOperator = + new DoFnOperator<>( + transform.getFn(), + inputCoder, + new TupleTag<OutputT>("main output"), + Collections.<TupleTag<?>>emptyList(), + new DoFnOperator.DefaultOutputManagerFactory<WindowedValue<OutputT>>(), + windowingStrategy, + new HashMap<Integer, PCollectionView<?>>(), /* side-input mapping */ + Collections.<PCollectionView<?>>emptyList(), /* side inputs */ + context.getPipelineOptions(), + keyCoder); + + SingleOutputStreamOperator<WindowedValue<OutputT>> outDataStream = inputDataStream + .transform(transform.getName(), typeInfo, doFnOperator); + + context.setOutputDataStream(context.getOutput(transform), outDataStream); + } else { + Tuple2<Map<Integer, PCollectionView<?>>, DataStream<RawUnionValue>> transformedSideInputs = + transformSideInputs(sideInputs, context); + + DoFnOperator<InputT, OutputT, WindowedValue<OutputT>> doFnOperator = + new DoFnOperator<>( + transform.getFn(), + inputCoder, + new TupleTag<OutputT>("main output"), + Collections.<TupleTag<?>>emptyList(), + new DoFnOperator.DefaultOutputManagerFactory<WindowedValue<OutputT>>(), + windowingStrategy, + transformedSideInputs.f0, + sideInputs, + context.getPipelineOptions(), + keyCoder); + + SingleOutputStreamOperator<WindowedValue<OutputT>> outDataStream; + if (stateful) { + // we have to manually contruct the two-input transform because we're not + // allowed to have only one input keyed, normally. + KeyedStream keyedStream = (KeyedStream<?, InputT>) inputDataStream; + TwoInputTransformation< + WindowedValue<KV<?, InputT>>, + RawUnionValue, + WindowedValue<OutputT>> rawFlinkTransform = new TwoInputTransformation<>( + keyedStream.getTransformation(), + transformedSideInputs.f1.broadcast().getTransformation(), + transform.getName(), + (TwoInputStreamOperator) doFnOperator, + typeInfo, + keyedStream.getParallelism()); + + rawFlinkTransform.setStateKeyType(keyedStream.getKeyType()); + rawFlinkTransform.setStateKeySelectors(keyedStream.getKeySelector(), null); + + outDataStream = new SingleOutputStreamOperator( + keyedStream.getExecutionEnvironment(), + rawFlinkTransform) {}; // we have to cheat around the ctor being protected + + keyedStream.getExecutionEnvironment().addOperator(rawFlinkTransform); + } else { + outDataStream = inputDataStream + .connect(transformedSideInputs.f1.broadcast()) + .transform(transform.getName(), typeInfo, doFnOperator); + } + context.setOutputDataStream(context.getOutput(transform), outDataStream); + } + } + } + + /** + * Wraps each element in a {@link RawUnionValue} with the given tag id. + */ + private static class ToRawUnion<T> implements MapFunction<T, RawUnionValue> { + private final int intTag; + + public ToRawUnion(int intTag) { + this.intTag = intTag; + } + + @Override + public RawUnionValue map(T o) throws Exception { + return new RawUnionValue(intTag, o); + } + } + + private static Tuple2<Map<Integer, PCollectionView<?>>, DataStream<RawUnionValue>> + transformSideInputs( + Collection<PCollectionView<?>> sideInputs, + FlinkStreamingTranslationContext context) { + + // collect all side inputs + Map<TupleTag<?>, Integer> tagToIntMapping = new HashMap<>(); + Map<Integer, PCollectionView<?>> intToViewMapping = new HashMap<>(); + int count = 0; + for (PCollectionView<?> sideInput: sideInputs) { + TupleTag<?> tag = sideInput.getTagInternal(); + intToViewMapping.put(count, sideInput); + tagToIntMapping.put(tag, count); + count++; + Coder<Iterable<WindowedValue<?>>> coder = sideInput.getCoderInternal(); + } + + + List<Coder<?>> inputCoders = new ArrayList<>(); + for (PCollectionView<?> sideInput: sideInputs) { + DataStream<Object> sideInputStream = context.getInputDataStream(sideInput); + TypeInformation<Object> tpe = sideInputStream.getType(); + if (!(tpe instanceof CoderTypeInformation)) { + throw new IllegalStateException( + "Input Stream TypeInformation is no CoderTypeInformation."); + } + + Coder<?> coder = ((CoderTypeInformation) tpe).getCoder(); + inputCoders.add(coder); + } + + UnionCoder unionCoder = UnionCoder.of(inputCoders); + + CoderTypeInformation<RawUnionValue> unionTypeInformation = + new CoderTypeInformation<>(unionCoder); + + // transform each side input to RawUnionValue and union them + DataStream<RawUnionValue> sideInputUnion = null; + + for (PCollectionView<?> sideInput: sideInputs) { + TupleTag<?> tag = sideInput.getTagInternal(); + final int intTag = tagToIntMapping.get(tag); + DataStream<Object> sideInputStream = context.getInputDataStream(sideInput); + DataStream<RawUnionValue> unionValueStream = + sideInputStream.map(new ToRawUnion<>(intTag)).returns(unionTypeInformation); + + if (sideInputUnion == null) { + sideInputUnion = unionValueStream; + } else { + sideInputUnion = sideInputUnion.union(unionValueStream); + } + } + + if (sideInputUnion == null) { + throw new IllegalStateException("No unioned side inputs, this indicates a bug."); + } + + return new Tuple2<>(intToViewMapping, sideInputUnion); + } + + + private static class ParDoBoundMultiStreamingTranslator<InputT, OutputT> + extends FlinkStreamingPipelineTranslator.StreamTransformTranslator< + ParDo.BoundMulti<InputT, OutputT>> { + + @Override + public void translateNode( + ParDo.BoundMulti<InputT, OutputT> transform, + FlinkStreamingTranslationContext context) { + + DoFn<InputT, OutputT> doFn = transform.getFn(); + rejectSplittable(doFn); + + // we assume that the transformation does not change the windowing strategy. + WindowingStrategy<?, ?> windowingStrategy = + context.getInput(transform).getWindowingStrategy(); + + List<TaggedPValue> outputs = context.getOutputs(transform); + + Map<TupleTag<?>, Integer> tagsToLabels = + transformTupleTagsToLabels(transform.getMainOutputTag(), outputs); + + List<PCollectionView<?>> sideInputs = transform.getSideInputs(); + + SingleOutputStreamOperator<RawUnionValue> unionOutputStream; + + @SuppressWarnings("unchecked") + PCollection<InputT> inputPCollection = (PCollection<InputT>) context.getInput(transform); + + Coder<WindowedValue<InputT>> inputCoder = context.getCoder(inputPCollection); + + DataStream<WindowedValue<InputT>> inputDataStream = + context.getInputDataStream(context.getInput(transform)); + Coder keyCoder = null; + boolean stateful = false; + DoFnSignature signature = DoFnSignatures.getSignature(transform.getFn().getClass()); + if (signature.stateDeclarations().size() > 0 + || signature.timerDeclarations().size() > 0) { + // Based on the fact that the signature is stateful, DoFnSignatures ensures + // that it is also keyed + keyCoder = ((KvCoder) inputPCollection.getCoder()).getKeyCoder(); + inputDataStream = inputDataStream.keyBy(new KvToByteBufferKeySelector(keyCoder)); + stateful = true; + } + + if (sideInputs.isEmpty()) { + DoFnOperator<InputT, OutputT, RawUnionValue> doFnOperator = + new DoFnOperator<>( + transform.getFn(), + inputCoder, + transform.getMainOutputTag(), + transform.getSideOutputTags().getAll(), + new DoFnOperator.MultiOutputOutputManagerFactory(tagsToLabels), + windowingStrategy, + new HashMap<Integer, PCollectionView<?>>(), /* side-input mapping */ + Collections.<PCollectionView<?>>emptyList(), /* side inputs */ + context.getPipelineOptions(), + keyCoder); + + UnionCoder outputUnionCoder = createUnionCoder(outputs); + + CoderTypeInformation<RawUnionValue> outputUnionTypeInformation = + new CoderTypeInformation<>(outputUnionCoder); + + unionOutputStream = inputDataStream + .transform(transform.getName(), outputUnionTypeInformation, doFnOperator); + + } else { + Tuple2<Map<Integer, PCollectionView<?>>, DataStream<RawUnionValue>> transformedSideInputs = + transformSideInputs(sideInputs, context); + + DoFnOperator<InputT, OutputT, RawUnionValue> doFnOperator = + new DoFnOperator<>( + transform.getFn(), + inputCoder, + transform.getMainOutputTag(), + transform.getSideOutputTags().getAll(), + new DoFnOperator.MultiOutputOutputManagerFactory(tagsToLabels), + windowingStrategy, + transformedSideInputs.f0, + sideInputs, + context.getPipelineOptions(), + keyCoder); + + UnionCoder outputUnionCoder = createUnionCoder(outputs); + + CoderTypeInformation<RawUnionValue> outputUnionTypeInformation = + new CoderTypeInformation<>(outputUnionCoder); + + if (stateful) { + // we have to manually contruct the two-input transform because we're not + // allowed to have only one input keyed, normally. + KeyedStream keyedStream = (KeyedStream<?, InputT>) inputDataStream; + TwoInputTransformation< + WindowedValue<KV<?, InputT>>, + RawUnionValue, + WindowedValue<OutputT>> rawFlinkTransform = new TwoInputTransformation( + keyedStream.getTransformation(), + transformedSideInputs.f1.broadcast().getTransformation(), + transform.getName(), + (TwoInputStreamOperator) doFnOperator, + outputUnionTypeInformation, + keyedStream.getParallelism()); + + rawFlinkTransform.setStateKeyType(keyedStream.getKeyType()); + rawFlinkTransform.setStateKeySelectors(keyedStream.getKeySelector(), null); + + unionOutputStream = new SingleOutputStreamOperator( + keyedStream.getExecutionEnvironment(), + rawFlinkTransform) {}; // we have to cheat around the ctor being protected + + keyedStream.getExecutionEnvironment().addOperator(rawFlinkTransform); + + } else { + unionOutputStream = inputDataStream + .connect(transformedSideInputs.f1.broadcast()) + .transform(transform.getName(), outputUnionTypeInformation, doFnOperator); + } + } + + SplitStream<RawUnionValue> splitStream = unionOutputStream + .split(new OutputSelector<RawUnionValue>() { + @Override + public Iterable<String> select(RawUnionValue value) { + return Collections.singletonList(Integer.toString(value.getUnionTag())); + } + }); + + for (TaggedPValue output : outputs) { + final int outputTag = tagsToLabels.get(output.getTag()); + + TypeInformation outputTypeInfo = context.getTypeInfo((PCollection<?>) output.getValue()); + + @SuppressWarnings("unchecked") + DataStream unwrapped = splitStream.select(String.valueOf(outputTag)) + .flatMap(new FlatMapFunction<RawUnionValue, Object>() { + @Override + public void flatMap(RawUnionValue value, Collector<Object> out) throws Exception { + out.collect(value.getValue()); + } + }).returns(outputTypeInfo); + + context.setOutputDataStream(output.getValue(), unwrapped); + } + } + + private Map<TupleTag<?>, Integer> transformTupleTagsToLabels( + TupleTag<?> mainTag, + List<TaggedPValue> allTaggedValues) { + + Map<TupleTag<?>, Integer> tagToLabelMap = Maps.newHashMap(); + int count = 0; + tagToLabelMap.put(mainTag, count++); + for (TaggedPValue taggedPValue : allTaggedValues) { + if (!tagToLabelMap.containsKey(taggedPValue.getTag())) { + tagToLabelMap.put(taggedPValue.getTag(), count++); + } + } + return tagToLabelMap; + } + + private UnionCoder createUnionCoder(Collection<TaggedPValue> taggedCollections) { + List<Coder<?>> outputCoders = Lists.newArrayList(); + for (TaggedPValue taggedColl : taggedCollections) { + checkArgument( + taggedColl.getValue() instanceof PCollection, + "A Union Coder can only be created for a Collection of Tagged %s. Got %s", + PCollection.class.getSimpleName(), + taggedColl.getValue().getClass().getSimpleName()); + PCollection<?> coll = (PCollection<?>) taggedColl.getValue(); + WindowedValue.FullWindowedValueCoder<?> windowedValueCoder = + WindowedValue.getFullCoder( + coll.getCoder(), + coll.getWindowingStrategy().getWindowFn().windowCoder()); + outputCoders.add(windowedValueCoder); + } + return UnionCoder.of(outputCoders); + } + } + + private static class CreateViewStreamingTranslator<ElemT, ViewT> + extends FlinkStreamingPipelineTranslator.StreamTransformTranslator< + FlinkStreamingViewOverrides.CreateFlinkPCollectionView<ElemT, ViewT>> { + + @Override + public void translateNode( + FlinkStreamingViewOverrides.CreateFlinkPCollectionView<ElemT, ViewT> transform, + FlinkStreamingTranslationContext context) { + // just forward + DataStream<WindowedValue<List<ElemT>>> inputDataSet = + context.getInputDataStream(context.getInput(transform)); + + PCollectionView<ViewT> view = context.getOutput(transform); + + context.setOutputDataStream(view, inputDataSet); + } + } + + private static class WindowBoundTranslator<T> + extends FlinkStreamingPipelineTranslator.StreamTransformTranslator<Window.Bound<T>> { + + @Override + public void translateNode( + Window.Bound<T> transform, + FlinkStreamingTranslationContext context) { + + @SuppressWarnings("unchecked") + WindowingStrategy<T, BoundedWindow> windowingStrategy = + (WindowingStrategy<T, BoundedWindow>) + context.getOutput(transform).getWindowingStrategy(); + + TypeInformation<WindowedValue<T>> typeInfo = + context.getTypeInfo(context.getOutput(transform)); + + DataStream<WindowedValue<T>> inputDataStream = + context.getInputDataStream(context.getInput(transform)); + + WindowFn<T, ? extends BoundedWindow> windowFn = windowingStrategy.getWindowFn(); + + FlinkAssignWindows<T, ? extends BoundedWindow> assignWindowsFunction = + new FlinkAssignWindows<>(windowFn); + + SingleOutputStreamOperator<WindowedValue<T>> outputDataStream = inputDataStream + .flatMap(assignWindowsFunction) + .name(context.getOutput(transform).getName()) + .returns(typeInfo); + + context.setOutputDataStream(context.getOutput(transform), outputDataStream); + } + } + + private static class ReshuffleTranslatorStreaming<K, InputT> + extends FlinkStreamingPipelineTranslator.StreamTransformTranslator<Reshuffle<K, InputT>> { + + @Override + public void translateNode( + Reshuffle<K, InputT> transform, + FlinkStreamingTranslationContext context) { + + DataStream<WindowedValue<KV<K, InputT>>> inputDataSet = + context.getInputDataStream(context.getInput(transform)); + + context.setOutputDataStream(context.getOutput(transform), inputDataSet.rebalance()); + + } + } + + + private static class GroupByKeyTranslator<K, InputT> + extends FlinkStreamingPipelineTranslator.StreamTransformTranslator<GroupByKey<K, InputT>> { + + @Override + public void translateNode( + GroupByKey<K, InputT> transform, + FlinkStreamingTranslationContext context) { + + PCollection<KV<K, InputT>> input = context.getInput(transform); + + @SuppressWarnings("unchecked") + WindowingStrategy<?, BoundedWindow> windowingStrategy = + (WindowingStrategy<?, BoundedWindow>) input.getWindowingStrategy(); + + KvCoder<K, InputT> inputKvCoder = (KvCoder<K, InputT>) input.getCoder(); + + SingletonKeyedWorkItemCoder<K, InputT> workItemCoder = SingletonKeyedWorkItemCoder.of( + inputKvCoder.getKeyCoder(), + inputKvCoder.getValueCoder(), + input.getWindowingStrategy().getWindowFn().windowCoder()); + + DataStream<WindowedValue<KV<K, InputT>>> inputDataStream = context.getInputDataStream(input); + + WindowedValue. + FullWindowedValueCoder<SingletonKeyedWorkItem<K, InputT>> windowedWorkItemCoder = + WindowedValue.getFullCoder( + workItemCoder, + input.getWindowingStrategy().getWindowFn().windowCoder()); + + CoderTypeInformation<WindowedValue<SingletonKeyedWorkItem<K, InputT>>> workItemTypeInfo = + new CoderTypeInformation<>(windowedWorkItemCoder); + + DataStream<WindowedValue<SingletonKeyedWorkItem<K, InputT>>> workItemStream = + inputDataStream + .flatMap(new CombinePerKeyTranslator.ToKeyedWorkItem<K, InputT>()) + .returns(workItemTypeInfo).name("ToKeyedWorkItem"); + + KeyedStream< + WindowedValue< + SingletonKeyedWorkItem<K, InputT>>, ByteBuffer> keyedWorkItemStream = workItemStream + .keyBy(new WorkItemKeySelector<K, InputT>(inputKvCoder.getKeyCoder())); + + SystemReduceFn<K, InputT, Iterable<InputT>, Iterable<InputT>, BoundedWindow> reduceFn = + SystemReduceFn.buffering(inputKvCoder.getValueCoder()); + + TypeInformation<WindowedValue<KV<K, Iterable<InputT>>>> outputTypeInfo = + context.getTypeInfo(context.getOutput(transform)); + + DoFnOperator.DefaultOutputManagerFactory< + WindowedValue<KV<K, Iterable<InputT>>>> outputManagerFactory = + new DoFnOperator.DefaultOutputManagerFactory<>(); + + WindowDoFnOperator<K, InputT, Iterable<InputT>> doFnOperator = + new WindowDoFnOperator<>( + reduceFn, + (Coder) windowedWorkItemCoder, + new TupleTag<KV<K, Iterable<InputT>>>("main output"), + Collections.<TupleTag<?>>emptyList(), + outputManagerFactory, + windowingStrategy, + new HashMap<Integer, PCollectionView<?>>(), /* side-input mapping */ + Collections.<PCollectionView<?>>emptyList(), /* side inputs */ + context.getPipelineOptions(), + inputKvCoder.getKeyCoder()); + + // our operator excepts WindowedValue<KeyedWorkItem> while our input stream + // is WindowedValue<SingletonKeyedWorkItem>, which is fine but Java doesn't like it ... + @SuppressWarnings("unchecked") + SingleOutputStreamOperator<WindowedValue<KV<K, Iterable<InputT>>>> outDataStream = + keyedWorkItemStream + .transform( + transform.getName(), + outputTypeInfo, + (OneInputStreamOperator) doFnOperator); + + context.setOutputDataStream(context.getOutput(transform), outDataStream); + + } + } + + private static class CombinePerKeyTranslator<K, InputT, OutputT> + extends FlinkStreamingPipelineTranslator.StreamTransformTranslator< + Combine.PerKey<K, InputT, OutputT>> { + + @Override + boolean canTranslate( + Combine.PerKey<K, InputT, OutputT> transform, + FlinkStreamingTranslationContext context) { + + // if we have a merging window strategy and side inputs we cannot + // translate as a proper combine. We have to group and then run the combine + // over the final grouped values. + PCollection<KV<K, InputT>> input = context.getInput(transform); + + @SuppressWarnings("unchecked") + WindowingStrategy<?, BoundedWindow> windowingStrategy = + (WindowingStrategy<?, BoundedWindow>) input.getWindowingStrategy(); + + return windowingStrategy.getWindowFn().isNonMerging() || transform.getSideInputs().isEmpty(); + } + + @Override + public void translateNode( + Combine.PerKey<K, InputT, OutputT> transform, + FlinkStreamingTranslationContext context) { + + PCollection<KV<K, InputT>> input = context.getInput(transform); + + @SuppressWarnings("unchecked") + WindowingStrategy<?, BoundedWindow> windowingStrategy = + (WindowingStrategy<?, BoundedWindow>) input.getWindowingStrategy(); + + KvCoder<K, InputT> inputKvCoder = (KvCoder<K, InputT>) input.getCoder(); + + SingletonKeyedWorkItemCoder<K, InputT> workItemCoder = SingletonKeyedWorkItemCoder.of( + inputKvCoder.getKeyCoder(), + inputKvCoder.getValueCoder(), + input.getWindowingStrategy().getWindowFn().windowCoder()); + + DataStream<WindowedValue<KV<K, InputT>>> inputDataStream = context.getInputDataStream(input); + + WindowedValue. + FullWindowedValueCoder<SingletonKeyedWorkItem<K, InputT>> windowedWorkItemCoder = + WindowedValue.getFullCoder( + workItemCoder, + input.getWindowingStrategy().getWindowFn().windowCoder()); + + CoderTypeInformation<WindowedValue<SingletonKeyedWorkItem<K, InputT>>> workItemTypeInfo = + new CoderTypeInformation<>(windowedWorkItemCoder); + + DataStream<WindowedValue<SingletonKeyedWorkItem<K, InputT>>> workItemStream = + inputDataStream + .flatMap(new ToKeyedWorkItem<K, InputT>()) + .returns(workItemTypeInfo).name("ToKeyedWorkItem"); + + KeyedStream< + WindowedValue< + SingletonKeyedWorkItem<K, InputT>>, ByteBuffer> keyedWorkItemStream = workItemStream + .keyBy(new WorkItemKeySelector<K, InputT>(inputKvCoder.getKeyCoder())); + + SystemReduceFn<K, InputT, ?, OutputT, BoundedWindow> reduceFn = SystemReduceFn.combining( + inputKvCoder.getKeyCoder(), + AppliedCombineFn.withInputCoder( + transform.getFn(), input.getPipeline().getCoderRegistry(), inputKvCoder)); + + TypeInformation<WindowedValue<KV<K, OutputT>>> outputTypeInfo = + context.getTypeInfo(context.getOutput(transform)); + + List<PCollectionView<?>> sideInputs = transform.getSideInputs(); + + if (sideInputs.isEmpty()) { + + WindowDoFnOperator<K, InputT, OutputT> doFnOperator = + new WindowDoFnOperator<>( + reduceFn, + (Coder) windowedWorkItemCoder, + new TupleTag<KV<K, OutputT>>("main output"), + Collections.<TupleTag<?>>emptyList(), + new DoFnOperator.DefaultOutputManagerFactory<WindowedValue<KV<K, OutputT>>>(), + windowingStrategy, + new HashMap<Integer, PCollectionView<?>>(), /* side-input mapping */ + Collections.<PCollectionView<?>>emptyList(), /* side inputs */ + context.getPipelineOptions(), + inputKvCoder.getKeyCoder()); + + // our operator excepts WindowedValue<KeyedWorkItem> while our input stream + // is WindowedValue<SingletonKeyedWorkItem>, which is fine but Java doesn't like it ... + @SuppressWarnings("unchecked") + SingleOutputStreamOperator<WindowedValue<KV<K, OutputT>>> outDataStream = + keyedWorkItemStream.transform( + transform.getName(), outputTypeInfo, (OneInputStreamOperator) doFnOperator); + + context.setOutputDataStream(context.getOutput(transform), outDataStream); + } else { + Tuple2<Map<Integer, PCollectionView<?>>, DataStream<RawUnionValue>> transformSideInputs = + transformSideInputs(sideInputs, context); + + WindowDoFnOperator<K, InputT, OutputT> doFnOperator = + new WindowDoFnOperator<>( + reduceFn, + (Coder) windowedWorkItemCoder, + new TupleTag<KV<K, OutputT>>("main output"), + Collections.<TupleTag<?>>emptyList(), + new DoFnOperator.DefaultOutputManagerFactory<WindowedValue<KV<K, OutputT>>>(), + windowingStrategy, + transformSideInputs.f0, + sideInputs, + context.getPipelineOptions(), + inputKvCoder.getKeyCoder()); + + // we have to manually contruct the two-input transform because we're not + // allowed to have only one input keyed, normally. + + TwoInputTransformation< + WindowedValue<SingletonKeyedWorkItem<K, InputT>>, + RawUnionValue, + WindowedValue<KV<K, OutputT>>> rawFlinkTransform = new TwoInputTransformation<>( + keyedWorkItemStream.getTransformation(), + transformSideInputs.f1.broadcast().getTransformation(), + transform.getName(), + (TwoInputStreamOperator) doFnOperator, + outputTypeInfo, + keyedWorkItemStream.getParallelism()); + + rawFlinkTransform.setStateKeyType(keyedWorkItemStream.getKeyType()); + rawFlinkTransform.setStateKeySelectors(keyedWorkItemStream.getKeySelector(), null); + + @SuppressWarnings({ "unchecked", "rawtypes" }) + SingleOutputStreamOperator<WindowedValue<KV<K, OutputT>>> outDataStream = + new SingleOutputStreamOperator( + keyedWorkItemStream.getExecutionEnvironment(), + rawFlinkTransform) {}; // we have to cheat around the ctor being protected + + keyedWorkItemStream.getExecutionEnvironment().addOperator(rawFlinkTransform); + + context.setOutputDataStream(context.getOutput(transform), outDataStream); + } + } + + private static class ToKeyedWorkItem<K, InputT> + extends RichFlatMapFunction< + WindowedValue<KV<K, InputT>>, + WindowedValue<SingletonKeyedWorkItem<K, InputT>>> { + + @Override + public void flatMap( + WindowedValue<KV<K, InputT>> inWithMultipleWindows, + Collector<WindowedValue<SingletonKeyedWorkItem<K, InputT>>> out) throws Exception { + + // we need to wrap each one work item per window for now + // since otherwise the PushbackSideInputRunner will not correctly + // determine whether side inputs are ready + for (WindowedValue<KV<K, InputT>> in : inWithMultipleWindows.explodeWindows()) { + SingletonKeyedWorkItem<K, InputT> workItem = + new SingletonKeyedWorkItem<>( + in.getValue().getKey(), + in.withValue(in.getValue().getValue())); + + in.withValue(workItem); + out.collect(in.withValue(workItem)); + } + } + } + } + + private static class FlattenPCollectionTranslator<T> + extends FlinkStreamingPipelineTranslator.StreamTransformTranslator< + Flatten.FlattenPCollectionList<T>> { + + @Override + public void translateNode( + Flatten.FlattenPCollectionList<T> transform, + FlinkStreamingTranslationContext context) { + List<TaggedPValue> allInputs = context.getInputs(transform); + + if (allInputs.isEmpty()) { + + // create an empty dummy source to satisfy downstream operations + // we cannot create an empty source in Flink, therefore we have to + // add the flatMap that simply never forwards the single element + DataStreamSource<String> dummySource = + context.getExecutionEnvironment().fromElements("dummy"); + + DataStream<WindowedValue<T>> result = dummySource.flatMap( + new FlatMapFunction<String, WindowedValue<T>>() { + @Override + public void flatMap( + String s, + Collector<WindowedValue<T>> collector) throws Exception { + // never return anything + } + }).returns( + new CoderTypeInformation<>( + WindowedValue.getFullCoder( + (Coder<T>) VoidCoder.of(), + GlobalWindow.Coder.INSTANCE))); + context.setOutputDataStream(context.getOutput(transform), result); + + } else { + DataStream<T> result = null; + for (TaggedPValue input : allInputs) { + DataStream<T> current = context.getInputDataStream(input.getValue()); + result = (result == null) ? current : result.union(current); + } + context.setOutputDataStream(context.getOutput(transform), result); + } + } + } +}
http://git-wip-us.apache.org/repos/asf/beam/blob/e0e7e52b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTranslationContext.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTranslationContext.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTranslationContext.java new file mode 100644 index 0000000..3d5b83f --- /dev/null +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTranslationContext.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink; + +import static com.google.common.base.Preconditions.checkNotNull; + +import com.google.common.collect.Iterables; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.beam.runners.flink.translation.types.CoderTypeInformation; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.transforms.AppliedPTransform; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PInput; +import org.apache.beam.sdk.values.POutput; +import org.apache.beam.sdk.values.PValue; +import org.apache.beam.sdk.values.TaggedPValue; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; + +/** + * Helper for keeping track of which {@link DataStream DataStreams} map + * to which {@link PTransform PTransforms}. + */ +class FlinkStreamingTranslationContext { + + private final StreamExecutionEnvironment env; + private final PipelineOptions options; + + /** + * Keeps a mapping between the output value of the PTransform (in Dataflow) and the + * Flink Operator that produced it, after the translation of the correspondinf PTransform + * to its Flink equivalent. + * */ + private final Map<PValue, DataStream<?>> dataStreams; + + private AppliedPTransform<?, ?, ?> currentTransform; + + public FlinkStreamingTranslationContext(StreamExecutionEnvironment env, PipelineOptions options) { + this.env = checkNotNull(env); + this.options = checkNotNull(options); + this.dataStreams = new HashMap<>(); + } + + public StreamExecutionEnvironment getExecutionEnvironment() { + return env; + } + + public PipelineOptions getPipelineOptions() { + return options; + } + + @SuppressWarnings("unchecked") + public <T> DataStream<T> getInputDataStream(PValue value) { + return (DataStream<T>) dataStreams.get(value); + } + + public void setOutputDataStream(PValue value, DataStream<?> set) { + if (!dataStreams.containsKey(value)) { + dataStreams.put(value, set); + } + } + + /** + * Sets the AppliedPTransform which carries input/output. + * @param currentTransform + */ + public void setCurrentTransform(AppliedPTransform<?, ?, ?> currentTransform) { + this.currentTransform = currentTransform; + } + + public <T> Coder<WindowedValue<T>> getCoder(PCollection<T> collection) { + Coder<T> valueCoder = collection.getCoder(); + + return WindowedValue.getFullCoder( + valueCoder, + collection.getWindowingStrategy().getWindowFn().windowCoder()); + } + + @SuppressWarnings("unchecked") + public <T> TypeInformation<WindowedValue<T>> getTypeInfo(PCollection<T> collection) { + Coder<T> valueCoder = collection.getCoder(); + WindowedValue.FullWindowedValueCoder<T> windowedValueCoder = + WindowedValue.getFullCoder( + valueCoder, + collection.getWindowingStrategy().getWindowFn().windowCoder()); + + return new CoderTypeInformation<>(windowedValueCoder); + } + + + @SuppressWarnings("unchecked") + public <T extends PValue> T getInput(PTransform<T, ?> transform) { + return (T) Iterables.getOnlyElement(currentTransform.getInputs()).getValue(); + } + + public <T extends PInput> List<TaggedPValue> getInputs(PTransform<T, ?> transform) { + return currentTransform.getInputs(); + } + + @SuppressWarnings("unchecked") + public <T extends PValue> T getOutput(PTransform<?, T> transform) { + return (T) Iterables.getOnlyElement(currentTransform.getOutputs()).getValue(); + } + + public <OutputT extends POutput> List<TaggedPValue> getOutputs(PTransform<?, OutputT> transform) { + return currentTransform.getOutputs(); + } + +} http://git-wip-us.apache.org/repos/asf/beam/blob/e0e7e52b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingViewOverrides.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingViewOverrides.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingViewOverrides.java index 0a9df4e..0ff6367 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingViewOverrides.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingViewOverrides.java @@ -38,7 +38,7 @@ import org.apache.beam.sdk.values.PCollectionView; /** * Flink streaming overrides for various view (side input) transforms. */ -public class FlinkStreamingViewOverrides { +class FlinkStreamingViewOverrides { /** * Specialized implementation for http://git-wip-us.apache.org/repos/asf/beam/blob/e0e7e52b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/PipelineTranslationOptimizer.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/PipelineTranslationOptimizer.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/PipelineTranslationOptimizer.java new file mode 100644 index 0000000..3acc3ea --- /dev/null +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/PipelineTranslationOptimizer.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink; + +import org.apache.beam.sdk.io.Read; +import org.apache.beam.sdk.runners.TransformHierarchy; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.values.PValue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Traverses the Pipeline to determine the {@link TranslationMode} for this pipeline. + */ +class PipelineTranslationOptimizer extends FlinkPipelineTranslator { + + private static final Logger LOG = LoggerFactory.getLogger(PipelineTranslationOptimizer.class); + + private TranslationMode translationMode; + + private final FlinkPipelineOptions options; + + public PipelineTranslationOptimizer(TranslationMode defaultMode, FlinkPipelineOptions options) { + this.translationMode = defaultMode; + this.options = options; + } + + public TranslationMode getTranslationMode() { + + // override user-specified translation mode + if (options.isStreaming()) { + return TranslationMode.STREAMING; + } + + return translationMode; + } + + @Override + public CompositeBehavior enterCompositeTransform(TransformHierarchy.Node node) { + return CompositeBehavior.ENTER_TRANSFORM; + } + + @Override + public void leaveCompositeTransform(TransformHierarchy.Node node) {} + + @Override + public void visitPrimitiveTransform(TransformHierarchy.Node node) { + Class<? extends PTransform> transformClass = node.getTransform().getClass(); + if (transformClass == Read.Unbounded.class) { + LOG.info("Found {}. Switching to streaming execution.", transformClass); + translationMode = TranslationMode.STREAMING; + } + } + + @Override + public void visitValue(PValue value, TransformHierarchy.Node producer) {} +} http://git-wip-us.apache.org/repos/asf/beam/blob/e0e7e52b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/TranslationMode.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/TranslationMode.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/TranslationMode.java new file mode 100644 index 0000000..ad54750 --- /dev/null +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/TranslationMode.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink; + +/** + * The translation mode of the Beam Pipeline. + */ +enum TranslationMode { + + /** Uses the batch mode of Flink. */ + BATCH, + + /** Uses the streaming mode of Flink. */ + STREAMING + +} http://git-wip-us.apache.org/repos/asf/beam/blob/e0e7e52b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchPipelineTranslator.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchPipelineTranslator.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchPipelineTranslator.java deleted file mode 100644 index 7fb17c8..0000000 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchPipelineTranslator.java +++ /dev/null @@ -1,139 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.flink.translation; - -import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.runners.TransformHierarchy; -import org.apache.beam.sdk.transforms.PTransform; -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.api.java.io.DiscardingOutputFormat; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * {@link Pipeline.PipelineVisitor} for executing a {@link Pipeline} as a - * Flink batch job. - */ -public class FlinkBatchPipelineTranslator extends FlinkPipelineTranslator { - - private static final Logger LOG = LoggerFactory.getLogger(FlinkBatchPipelineTranslator.class); - - /** - * The necessary context in the case of a batch job. - */ - private final FlinkBatchTranslationContext batchContext; - - private int depth = 0; - - public FlinkBatchPipelineTranslator(ExecutionEnvironment env, PipelineOptions options) { - this.batchContext = new FlinkBatchTranslationContext(env, options); - } - - @Override - @SuppressWarnings("rawtypes, unchecked") - public void translate(Pipeline pipeline) { - super.translate(pipeline); - - // terminate dangling DataSets - for (DataSet<?> dataSet: batchContext.getDanglingDataSets().values()) { - dataSet.output(new DiscardingOutputFormat()); - } - } - - // -------------------------------------------------------------------------------------------- - // Pipeline Visitor Methods - // -------------------------------------------------------------------------------------------- - - @Override - public CompositeBehavior enterCompositeTransform(TransformHierarchy.Node node) { - LOG.info("{} enterCompositeTransform- {}", genSpaces(this.depth), node.getFullName()); - this.depth++; - - BatchTransformTranslator<?> translator = getTranslator(node); - - if (translator != null) { - applyBatchTransform(node.getTransform(), node, translator); - LOG.info("{} translated- {}", genSpaces(this.depth), node.getFullName()); - return CompositeBehavior.DO_NOT_ENTER_TRANSFORM; - } else { - return CompositeBehavior.ENTER_TRANSFORM; - } - } - - @Override - public void leaveCompositeTransform(TransformHierarchy.Node node) { - this.depth--; - LOG.info("{} leaveCompositeTransform- {}", genSpaces(this.depth), node.getFullName()); - } - - @Override - public void visitPrimitiveTransform(TransformHierarchy.Node node) { - LOG.info("{} visitPrimitiveTransform- {}", genSpaces(this.depth), node.getFullName()); - - // get the transformation corresponding to the node we are - // currently visiting and translate it into its Flink alternative. - PTransform<?, ?> transform = node.getTransform(); - BatchTransformTranslator<?> translator = - FlinkBatchTransformTranslators.getTranslator(transform); - if (translator == null) { - LOG.info(node.getTransform().getClass().toString()); - throw new UnsupportedOperationException("The transform " + transform - + " is currently not supported."); - } - applyBatchTransform(transform, node, translator); - } - - private <T extends PTransform<?, ?>> void applyBatchTransform( - PTransform<?, ?> transform, - TransformHierarchy.Node node, - BatchTransformTranslator<?> translator) { - - @SuppressWarnings("unchecked") - T typedTransform = (T) transform; - - @SuppressWarnings("unchecked") - BatchTransformTranslator<T> typedTranslator = (BatchTransformTranslator<T>) translator; - - // create the applied PTransform on the batchContext - batchContext.setCurrentTransform(node.toAppliedPTransform()); - typedTranslator.translateNode(typedTransform, batchContext); - } - - /** - * A translator of a {@link PTransform}. - */ - public interface BatchTransformTranslator<TransformT extends PTransform> { - void translateNode(TransformT transform, FlinkBatchTranslationContext context); - } - - /** - * Returns a translator for the given node, if it is possible, otherwise null. - */ - private static BatchTransformTranslator<?> getTranslator(TransformHierarchy.Node node) { - PTransform<?, ?> transform = node.getTransform(); - - // Root of the graph is null - if (transform == null) { - return null; - } - - return FlinkBatchTransformTranslators.getTranslator(transform); - } -}
