Repository: beam Updated Branches: refs/heads/master 88e842534 -> 453e37bc6
http://git-wip-us.apache.org/repos/asf/beam/blob/e0e7e52b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java deleted file mode 100644 index 742ce91..0000000 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java +++ /dev/null @@ -1,1045 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.runners.flink.translation; - -import static com.google.common.base.Preconditions.checkArgument; - -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import org.apache.beam.runners.core.SystemReduceFn; -import org.apache.beam.runners.flink.FlinkRunner; -import org.apache.beam.runners.flink.FlinkStreamingViewOverrides; -import org.apache.beam.runners.flink.translation.functions.FlinkAssignWindows; -import org.apache.beam.runners.flink.translation.types.CoderTypeInformation; -import org.apache.beam.runners.flink.translation.types.FlinkCoder; -import org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator; -import org.apache.beam.runners.flink.translation.wrappers.streaming.KvToByteBufferKeySelector; -import org.apache.beam.runners.flink.translation.wrappers.streaming.SingletonKeyedWorkItem; -import org.apache.beam.runners.flink.translation.wrappers.streaming.SingletonKeyedWorkItemCoder; -import org.apache.beam.runners.flink.translation.wrappers.streaming.WindowDoFnOperator; -import org.apache.beam.runners.flink.translation.wrappers.streaming.WorkItemKeySelector; -import org.apache.beam.runners.flink.translation.wrappers.streaming.io.BoundedSourceWrapper; -import org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedFlinkSink; -import org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedFlinkSource; -import org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.KvCoder; -import org.apache.beam.sdk.coders.VoidCoder; -import org.apache.beam.sdk.io.Read; -import org.apache.beam.sdk.io.Sink; -import org.apache.beam.sdk.io.TextIO; -import org.apache.beam.sdk.io.Write; -import org.apache.beam.sdk.transforms.Combine; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.Flatten; -import org.apache.beam.sdk.transforms.GroupByKey; -import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.transforms.join.RawUnionValue; -import org.apache.beam.sdk.transforms.join.UnionCoder; -import org.apache.beam.sdk.transforms.reflect.DoFnSignature; -import org.apache.beam.sdk.transforms.reflect.DoFnSignatures; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.transforms.windowing.GlobalWindow; -import org.apache.beam.sdk.transforms.windowing.PaneInfo; -import org.apache.beam.sdk.transforms.windowing.Window; -import org.apache.beam.sdk.transforms.windowing.WindowFn; -import org.apache.beam.sdk.util.AppliedCombineFn; -import org.apache.beam.sdk.util.Reshuffle; -import org.apache.beam.sdk.util.WindowedValue; -import org.apache.beam.sdk.util.WindowingStrategy; -import org.apache.beam.sdk.values.KV; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PCollectionView; -import org.apache.beam.sdk.values.PValue; -import org.apache.beam.sdk.values.TaggedPValue; -import org.apache.beam.sdk.values.TupleTag; -import org.apache.flink.api.common.functions.FlatMapFunction; -import org.apache.flink.api.common.functions.MapFunction; -import org.apache.flink.api.common.functions.RichFlatMapFunction; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.core.fs.FileSystem; -import org.apache.flink.streaming.api.collector.selector.OutputSelector; -import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.datastream.DataStreamSink; -import org.apache.flink.streaming.api.datastream.DataStreamSource; -import org.apache.flink.streaming.api.datastream.KeyedStream; -import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; -import org.apache.flink.streaming.api.datastream.SplitStream; -import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks; -import org.apache.flink.streaming.api.operators.OneInputStreamOperator; -import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; -import org.apache.flink.streaming.api.transformations.TwoInputTransformation; -import org.apache.flink.util.Collector; -import org.joda.time.Instant; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * This class contains all the mappings between Beam and Flink - * <b>streaming</b> transformations. The {@link FlinkStreamingPipelineTranslator} - * traverses the Beam job and comes here to translate the encountered Beam transformations - * into Flink one, based on the mapping available in this class. - */ -public class FlinkStreamingTransformTranslators { - - // -------------------------------------------------------------------------------------------- - // Transform Translator Registry - // -------------------------------------------------------------------------------------------- - - @SuppressWarnings("rawtypes") - private static final Map< - Class<? extends PTransform>, - FlinkStreamingPipelineTranslator.StreamTransformTranslator> TRANSLATORS = new HashMap<>(); - - // here you can find all the available translators. - static { - TRANSLATORS.put(Read.Bounded.class, new BoundedReadSourceTranslator()); - TRANSLATORS.put(Read.Unbounded.class, new UnboundedReadSourceTranslator()); - TRANSLATORS.put(Write.Bound.class, new WriteSinkStreamingTranslator()); - TRANSLATORS.put(TextIO.Write.Bound.class, new TextIOWriteBoundStreamingTranslator()); - - TRANSLATORS.put(ParDo.Bound.class, new ParDoBoundStreamingTranslator()); - TRANSLATORS.put(ParDo.BoundMulti.class, new ParDoBoundMultiStreamingTranslator()); - - TRANSLATORS.put(Window.Bound.class, new WindowBoundTranslator()); - TRANSLATORS.put(Flatten.FlattenPCollectionList.class, new FlattenPCollectionTranslator()); - TRANSLATORS.put( - FlinkStreamingViewOverrides.CreateFlinkPCollectionView.class, - new CreateViewStreamingTranslator()); - - TRANSLATORS.put(Reshuffle.class, new ReshuffleTranslatorStreaming()); - TRANSLATORS.put(GroupByKey.class, new GroupByKeyTranslator()); - TRANSLATORS.put(Combine.PerKey.class, new CombinePerKeyTranslator()); - } - - public static FlinkStreamingPipelineTranslator.StreamTransformTranslator<?> getTranslator( - PTransform<?, ?> transform) { - return TRANSLATORS.get(transform.getClass()); - } - - // -------------------------------------------------------------------------------------------- - // Transformation Implementations - // -------------------------------------------------------------------------------------------- - - private static class TextIOWriteBoundStreamingTranslator - extends FlinkStreamingPipelineTranslator.StreamTransformTranslator<TextIO.Write.Bound> { - - private static final Logger LOG = - LoggerFactory.getLogger(TextIOWriteBoundStreamingTranslator.class); - - @Override - public void translateNode( - TextIO.Write.Bound transform, - FlinkStreamingTranslationContext context) { - PValue input = context.getInput(transform); - DataStream<WindowedValue<String>> inputDataStream = context.getInputDataStream(input); - - String filenamePrefix = transform.getFilenamePrefix(); - String filenameSuffix = transform.getFilenameSuffix(); - boolean needsValidation = transform.needsValidation(); - int numShards = transform.getNumShards(); - String shardNameTemplate = transform.getShardNameTemplate(); - - // TODO: Implement these. We need Flink support for this. - LOG.warn( - "Translation of TextIO.Write.needsValidation not yet supported. Is: {}.", - needsValidation); - LOG.warn( - "Translation of TextIO.Write.filenameSuffix not yet supported. Is: {}.", - filenameSuffix); - LOG.warn( - "Translation of TextIO.Write.shardNameTemplate not yet supported. Is: {}.", - shardNameTemplate); - - DataStream<String> dataSink = inputDataStream - .flatMap(new FlatMapFunction<WindowedValue<String>, String>() { - @Override - public void flatMap( - WindowedValue<String> value, - Collector<String> out) - throws Exception { - out.collect(value.getValue()); - } - }); - DataStreamSink<String> output = - dataSink.writeAsText(filenamePrefix, FileSystem.WriteMode.OVERWRITE); - - if (numShards > 0) { - output.setParallelism(numShards); - } - } - } - - private static class WriteSinkStreamingTranslator<T> - extends FlinkStreamingPipelineTranslator.StreamTransformTranslator<Write.Bound<T>> { - - @Override - public void translateNode(Write.Bound<T> transform, FlinkStreamingTranslationContext context) { - String name = transform.getName(); - PValue input = context.getInput(transform); - - Sink<T> sink = transform.getSink(); - if (!(sink instanceof UnboundedFlinkSink)) { - throw new UnsupportedOperationException( - "At the time, only unbounded Flink sinks are supported."); - } - - DataStream<WindowedValue<T>> inputDataSet = context.getInputDataStream(input); - - inputDataSet.flatMap(new FlatMapFunction<WindowedValue<T>, Object>() { - @Override - public void flatMap(WindowedValue<T> value, Collector<Object> out) throws Exception { - out.collect(value.getValue()); - } - }).addSink(((UnboundedFlinkSink<Object>) sink).getFlinkSource()).name(name); - } - } - - private static class UnboundedReadSourceTranslator<T> - extends FlinkStreamingPipelineTranslator.StreamTransformTranslator<Read.Unbounded<T>> { - - @Override - public void translateNode( - Read.Unbounded<T> transform, - FlinkStreamingTranslationContext context) { - PCollection<T> output = context.getOutput(transform); - - TypeInformation<WindowedValue<T>> outputTypeInfo = - context.getTypeInfo(context.getOutput(transform)); - - DataStream<WindowedValue<T>> source; - if (transform.getSource().getClass().equals(UnboundedFlinkSource.class)) { - @SuppressWarnings("unchecked") - UnboundedFlinkSource<T> flinkSourceFunction = - (UnboundedFlinkSource<T>) transform.getSource(); - - final AssignerWithPeriodicWatermarks<T> flinkAssigner = - flinkSourceFunction.getFlinkTimestampAssigner(); - - DataStream<T> flinkSource = context.getExecutionEnvironment() - .addSource(flinkSourceFunction.getFlinkSource()); - - flinkSourceFunction.setCoder( - new FlinkCoder<T>(flinkSource.getType(), - context.getExecutionEnvironment().getConfig())); - - source = flinkSource - .assignTimestampsAndWatermarks(flinkAssigner) - .flatMap(new FlatMapFunction<T, WindowedValue<T>>() { - @Override - public void flatMap(T s, Collector<WindowedValue<T>> collector) throws Exception { - collector.collect( - WindowedValue.of( - s, - new Instant(flinkAssigner.extractTimestamp(s, -1)), - GlobalWindow.INSTANCE, - PaneInfo.NO_FIRING)); - }}).returns(outputTypeInfo); - } else { - try { - UnboundedSourceWrapper<T, ?> sourceWrapper = - new UnboundedSourceWrapper<>( - context.getPipelineOptions(), - transform.getSource(), - context.getExecutionEnvironment().getParallelism()); - source = context - .getExecutionEnvironment() - .addSource(sourceWrapper).name(transform.getName()).returns(outputTypeInfo); - } catch (Exception e) { - throw new RuntimeException( - "Error while translating UnboundedSource: " + transform.getSource(), e); - } - } - - context.setOutputDataStream(output, source); - } - } - - private static class BoundedReadSourceTranslator<T> - extends FlinkStreamingPipelineTranslator.StreamTransformTranslator<Read.Bounded<T>> { - - @Override - public void translateNode( - Read.Bounded<T> transform, - FlinkStreamingTranslationContext context) { - PCollection<T> output = context.getOutput(transform); - - TypeInformation<WindowedValue<T>> outputTypeInfo = - context.getTypeInfo(context.getOutput(transform)); - - - DataStream<WindowedValue<T>> source; - try { - BoundedSourceWrapper<T> sourceWrapper = - new BoundedSourceWrapper<>( - context.getPipelineOptions(), - transform.getSource(), - context.getExecutionEnvironment().getParallelism()); - source = context - .getExecutionEnvironment() - .addSource(sourceWrapper).name(transform.getName()).returns(outputTypeInfo); - } catch (Exception e) { - throw new RuntimeException( - "Error while translating BoundedSource: " + transform.getSource(), e); - } - - context.setOutputDataStream(output, source); - } - } - - private static void rejectSplittable(DoFn<?, ?> doFn) { - DoFnSignature signature = DoFnSignatures.getSignature(doFn.getClass()); - if (signature.processElement().isSplittable()) { - throw new UnsupportedOperationException( - String.format( - "%s does not currently support splittable DoFn: %s", - FlinkRunner.class.getSimpleName(), doFn)); - } - } - - private static class ParDoBoundStreamingTranslator<InputT, OutputT> - extends FlinkStreamingPipelineTranslator.StreamTransformTranslator< - ParDo.Bound<InputT, OutputT>> { - - @Override - public void translateNode( - ParDo.Bound<InputT, OutputT> transform, - FlinkStreamingTranslationContext context) { - - DoFn<InputT, OutputT> doFn = transform.getFn(); - rejectSplittable(doFn); - - WindowingStrategy<?, ?> windowingStrategy = - context.getOutput(transform).getWindowingStrategy(); - - TypeInformation<WindowedValue<OutputT>> typeInfo = - context.getTypeInfo(context.getOutput(transform)); - - List<PCollectionView<?>> sideInputs = transform.getSideInputs(); - - @SuppressWarnings("unchecked") - PCollection<InputT> inputPCollection = (PCollection<InputT>) context.getInput(transform); - - Coder<WindowedValue<InputT>> inputCoder = context.getCoder(inputPCollection); - - DataStream<WindowedValue<InputT>> inputDataStream = - context.getInputDataStream(context.getInput(transform)); - Coder keyCoder = null; - boolean stateful = false; - DoFnSignature signature = DoFnSignatures.getSignature(transform.getFn().getClass()); - if (signature.stateDeclarations().size() > 0 - || signature.timerDeclarations().size() > 0) { - // Based on the fact that the signature is stateful, DoFnSignatures ensures - // that it is also keyed - keyCoder = ((KvCoder) inputPCollection.getCoder()).getKeyCoder(); - inputDataStream = inputDataStream.keyBy(new KvToByteBufferKeySelector(keyCoder)); - stateful = true; - } - - if (sideInputs.isEmpty()) { - DoFnOperator<InputT, OutputT, WindowedValue<OutputT>> doFnOperator = - new DoFnOperator<>( - transform.getFn(), - inputCoder, - new TupleTag<OutputT>("main output"), - Collections.<TupleTag<?>>emptyList(), - new DoFnOperator.DefaultOutputManagerFactory<WindowedValue<OutputT>>(), - windowingStrategy, - new HashMap<Integer, PCollectionView<?>>(), /* side-input mapping */ - Collections.<PCollectionView<?>>emptyList(), /* side inputs */ - context.getPipelineOptions(), - keyCoder); - - SingleOutputStreamOperator<WindowedValue<OutputT>> outDataStream = inputDataStream - .transform(transform.getName(), typeInfo, doFnOperator); - - context.setOutputDataStream(context.getOutput(transform), outDataStream); - } else { - Tuple2<Map<Integer, PCollectionView<?>>, DataStream<RawUnionValue>> transformedSideInputs = - transformSideInputs(sideInputs, context); - - DoFnOperator<InputT, OutputT, WindowedValue<OutputT>> doFnOperator = - new DoFnOperator<>( - transform.getFn(), - inputCoder, - new TupleTag<OutputT>("main output"), - Collections.<TupleTag<?>>emptyList(), - new DoFnOperator.DefaultOutputManagerFactory<WindowedValue<OutputT>>(), - windowingStrategy, - transformedSideInputs.f0, - sideInputs, - context.getPipelineOptions(), - keyCoder); - - SingleOutputStreamOperator<WindowedValue<OutputT>> outDataStream; - if (stateful) { - // we have to manually contruct the two-input transform because we're not - // allowed to have only one input keyed, normally. - KeyedStream keyedStream = (KeyedStream<?, InputT>) inputDataStream; - TwoInputTransformation< - WindowedValue<KV<?, InputT>>, - RawUnionValue, - WindowedValue<OutputT>> rawFlinkTransform = new TwoInputTransformation<>( - keyedStream.getTransformation(), - transformedSideInputs.f1.broadcast().getTransformation(), - transform.getName(), - (TwoInputStreamOperator) doFnOperator, - typeInfo, - keyedStream.getParallelism()); - - rawFlinkTransform.setStateKeyType(keyedStream.getKeyType()); - rawFlinkTransform.setStateKeySelectors(keyedStream.getKeySelector(), null); - - outDataStream = new SingleOutputStreamOperator( - keyedStream.getExecutionEnvironment(), - rawFlinkTransform) {}; // we have to cheat around the ctor being protected - - keyedStream.getExecutionEnvironment().addOperator(rawFlinkTransform); - } else { - outDataStream = inputDataStream - .connect(transformedSideInputs.f1.broadcast()) - .transform(transform.getName(), typeInfo, doFnOperator); - } - context.setOutputDataStream(context.getOutput(transform), outDataStream); - } - } - } - - /** - * Wraps each element in a {@link RawUnionValue} with the given tag id. - */ - private static class ToRawUnion<T> implements MapFunction<T, RawUnionValue> { - private final int intTag; - - public ToRawUnion(int intTag) { - this.intTag = intTag; - } - - @Override - public RawUnionValue map(T o) throws Exception { - return new RawUnionValue(intTag, o); - } - } - - private static Tuple2<Map<Integer, PCollectionView<?>>, DataStream<RawUnionValue>> - transformSideInputs( - Collection<PCollectionView<?>> sideInputs, - FlinkStreamingTranslationContext context) { - - // collect all side inputs - Map<TupleTag<?>, Integer> tagToIntMapping = new HashMap<>(); - Map<Integer, PCollectionView<?>> intToViewMapping = new HashMap<>(); - int count = 0; - for (PCollectionView<?> sideInput: sideInputs) { - TupleTag<?> tag = sideInput.getTagInternal(); - intToViewMapping.put(count, sideInput); - tagToIntMapping.put(tag, count); - count++; - Coder<Iterable<WindowedValue<?>>> coder = sideInput.getCoderInternal(); - } - - - List<Coder<?>> inputCoders = new ArrayList<>(); - for (PCollectionView<?> sideInput: sideInputs) { - DataStream<Object> sideInputStream = context.getInputDataStream(sideInput); - TypeInformation<Object> tpe = sideInputStream.getType(); - if (!(tpe instanceof CoderTypeInformation)) { - throw new IllegalStateException( - "Input Stream TypeInformation is no CoderTypeInformation."); - } - - Coder<?> coder = ((CoderTypeInformation) tpe).getCoder(); - inputCoders.add(coder); - } - - UnionCoder unionCoder = UnionCoder.of(inputCoders); - - CoderTypeInformation<RawUnionValue> unionTypeInformation = - new CoderTypeInformation<>(unionCoder); - - // transform each side input to RawUnionValue and union them - DataStream<RawUnionValue> sideInputUnion = null; - - for (PCollectionView<?> sideInput: sideInputs) { - TupleTag<?> tag = sideInput.getTagInternal(); - final int intTag = tagToIntMapping.get(tag); - DataStream<Object> sideInputStream = context.getInputDataStream(sideInput); - DataStream<RawUnionValue> unionValueStream = - sideInputStream.map(new ToRawUnion<>(intTag)).returns(unionTypeInformation); - - if (sideInputUnion == null) { - sideInputUnion = unionValueStream; - } else { - sideInputUnion = sideInputUnion.union(unionValueStream); - } - } - - if (sideInputUnion == null) { - throw new IllegalStateException("No unioned side inputs, this indicates a bug."); - } - - return new Tuple2<>(intToViewMapping, sideInputUnion); - } - - - private static class ParDoBoundMultiStreamingTranslator<InputT, OutputT> - extends FlinkStreamingPipelineTranslator.StreamTransformTranslator< - ParDo.BoundMulti<InputT, OutputT>> { - - @Override - public void translateNode( - ParDo.BoundMulti<InputT, OutputT> transform, - FlinkStreamingTranslationContext context) { - - DoFn<InputT, OutputT> doFn = transform.getFn(); - rejectSplittable(doFn); - - // we assume that the transformation does not change the windowing strategy. - WindowingStrategy<?, ?> windowingStrategy = - context.getInput(transform).getWindowingStrategy(); - - List<TaggedPValue> outputs = context.getOutputs(transform); - - Map<TupleTag<?>, Integer> tagsToLabels = - transformTupleTagsToLabels(transform.getMainOutputTag(), outputs); - - List<PCollectionView<?>> sideInputs = transform.getSideInputs(); - - SingleOutputStreamOperator<RawUnionValue> unionOutputStream; - - @SuppressWarnings("unchecked") - PCollection<InputT> inputPCollection = (PCollection<InputT>) context.getInput(transform); - - Coder<WindowedValue<InputT>> inputCoder = context.getCoder(inputPCollection); - - DataStream<WindowedValue<InputT>> inputDataStream = - context.getInputDataStream(context.getInput(transform)); - Coder keyCoder = null; - boolean stateful = false; - DoFnSignature signature = DoFnSignatures.getSignature(transform.getFn().getClass()); - if (signature.stateDeclarations().size() > 0 - || signature.timerDeclarations().size() > 0) { - // Based on the fact that the signature is stateful, DoFnSignatures ensures - // that it is also keyed - keyCoder = ((KvCoder) inputPCollection.getCoder()).getKeyCoder(); - inputDataStream = inputDataStream.keyBy(new KvToByteBufferKeySelector(keyCoder)); - stateful = true; - } - - if (sideInputs.isEmpty()) { - DoFnOperator<InputT, OutputT, RawUnionValue> doFnOperator = - new DoFnOperator<>( - transform.getFn(), - inputCoder, - transform.getMainOutputTag(), - transform.getSideOutputTags().getAll(), - new DoFnOperator.MultiOutputOutputManagerFactory(tagsToLabels), - windowingStrategy, - new HashMap<Integer, PCollectionView<?>>(), /* side-input mapping */ - Collections.<PCollectionView<?>>emptyList(), /* side inputs */ - context.getPipelineOptions(), - keyCoder); - - UnionCoder outputUnionCoder = createUnionCoder(outputs); - - CoderTypeInformation<RawUnionValue> outputUnionTypeInformation = - new CoderTypeInformation<>(outputUnionCoder); - - unionOutputStream = inputDataStream - .transform(transform.getName(), outputUnionTypeInformation, doFnOperator); - - } else { - Tuple2<Map<Integer, PCollectionView<?>>, DataStream<RawUnionValue>> transformedSideInputs = - transformSideInputs(sideInputs, context); - - DoFnOperator<InputT, OutputT, RawUnionValue> doFnOperator = - new DoFnOperator<>( - transform.getFn(), - inputCoder, - transform.getMainOutputTag(), - transform.getSideOutputTags().getAll(), - new DoFnOperator.MultiOutputOutputManagerFactory(tagsToLabels), - windowingStrategy, - transformedSideInputs.f0, - sideInputs, - context.getPipelineOptions(), - keyCoder); - - UnionCoder outputUnionCoder = createUnionCoder(outputs); - - CoderTypeInformation<RawUnionValue> outputUnionTypeInformation = - new CoderTypeInformation<>(outputUnionCoder); - - if (stateful) { - // we have to manually contruct the two-input transform because we're not - // allowed to have only one input keyed, normally. - KeyedStream keyedStream = (KeyedStream<?, InputT>) inputDataStream; - TwoInputTransformation< - WindowedValue<KV<?, InputT>>, - RawUnionValue, - WindowedValue<OutputT>> rawFlinkTransform = new TwoInputTransformation( - keyedStream.getTransformation(), - transformedSideInputs.f1.broadcast().getTransformation(), - transform.getName(), - (TwoInputStreamOperator) doFnOperator, - outputUnionTypeInformation, - keyedStream.getParallelism()); - - rawFlinkTransform.setStateKeyType(keyedStream.getKeyType()); - rawFlinkTransform.setStateKeySelectors(keyedStream.getKeySelector(), null); - - unionOutputStream = new SingleOutputStreamOperator( - keyedStream.getExecutionEnvironment(), - rawFlinkTransform) {}; // we have to cheat around the ctor being protected - - keyedStream.getExecutionEnvironment().addOperator(rawFlinkTransform); - - } else { - unionOutputStream = inputDataStream - .connect(transformedSideInputs.f1.broadcast()) - .transform(transform.getName(), outputUnionTypeInformation, doFnOperator); - } - } - - SplitStream<RawUnionValue> splitStream = unionOutputStream - .split(new OutputSelector<RawUnionValue>() { - @Override - public Iterable<String> select(RawUnionValue value) { - return Collections.singletonList(Integer.toString(value.getUnionTag())); - } - }); - - for (TaggedPValue output : outputs) { - final int outputTag = tagsToLabels.get(output.getTag()); - - TypeInformation outputTypeInfo = context.getTypeInfo((PCollection<?>) output.getValue()); - - @SuppressWarnings("unchecked") - DataStream unwrapped = splitStream.select(String.valueOf(outputTag)) - .flatMap(new FlatMapFunction<RawUnionValue, Object>() { - @Override - public void flatMap(RawUnionValue value, Collector<Object> out) throws Exception { - out.collect(value.getValue()); - } - }).returns(outputTypeInfo); - - context.setOutputDataStream(output.getValue(), unwrapped); - } - } - - private Map<TupleTag<?>, Integer> transformTupleTagsToLabels( - TupleTag<?> mainTag, - List<TaggedPValue> allTaggedValues) { - - Map<TupleTag<?>, Integer> tagToLabelMap = Maps.newHashMap(); - int count = 0; - tagToLabelMap.put(mainTag, count++); - for (TaggedPValue taggedPValue : allTaggedValues) { - if (!tagToLabelMap.containsKey(taggedPValue.getTag())) { - tagToLabelMap.put(taggedPValue.getTag(), count++); - } - } - return tagToLabelMap; - } - - private UnionCoder createUnionCoder(Collection<TaggedPValue> taggedCollections) { - List<Coder<?>> outputCoders = Lists.newArrayList(); - for (TaggedPValue taggedColl : taggedCollections) { - checkArgument( - taggedColl.getValue() instanceof PCollection, - "A Union Coder can only be created for a Collection of Tagged %s. Got %s", - PCollection.class.getSimpleName(), - taggedColl.getValue().getClass().getSimpleName()); - PCollection<?> coll = (PCollection<?>) taggedColl.getValue(); - WindowedValue.FullWindowedValueCoder<?> windowedValueCoder = - WindowedValue.getFullCoder( - coll.getCoder(), - coll.getWindowingStrategy().getWindowFn().windowCoder()); - outputCoders.add(windowedValueCoder); - } - return UnionCoder.of(outputCoders); - } - } - - private static class CreateViewStreamingTranslator<ElemT, ViewT> - extends FlinkStreamingPipelineTranslator.StreamTransformTranslator< - FlinkStreamingViewOverrides.CreateFlinkPCollectionView<ElemT, ViewT>> { - - @Override - public void translateNode( - FlinkStreamingViewOverrides.CreateFlinkPCollectionView<ElemT, ViewT> transform, - FlinkStreamingTranslationContext context) { - // just forward - DataStream<WindowedValue<List<ElemT>>> inputDataSet = - context.getInputDataStream(context.getInput(transform)); - - PCollectionView<ViewT> view = context.getOutput(transform); - - context.setOutputDataStream(view, inputDataSet); - } - } - - private static class WindowBoundTranslator<T> - extends FlinkStreamingPipelineTranslator.StreamTransformTranslator<Window.Bound<T>> { - - @Override - public void translateNode( - Window.Bound<T> transform, - FlinkStreamingTranslationContext context) { - - @SuppressWarnings("unchecked") - WindowingStrategy<T, BoundedWindow> windowingStrategy = - (WindowingStrategy<T, BoundedWindow>) - context.getOutput(transform).getWindowingStrategy(); - - TypeInformation<WindowedValue<T>> typeInfo = - context.getTypeInfo(context.getOutput(transform)); - - DataStream<WindowedValue<T>> inputDataStream = - context.getInputDataStream(context.getInput(transform)); - - WindowFn<T, ? extends BoundedWindow> windowFn = windowingStrategy.getWindowFn(); - - FlinkAssignWindows<T, ? extends BoundedWindow> assignWindowsFunction = - new FlinkAssignWindows<>(windowFn); - - SingleOutputStreamOperator<WindowedValue<T>> outputDataStream = inputDataStream - .flatMap(assignWindowsFunction) - .name(context.getOutput(transform).getName()) - .returns(typeInfo); - - context.setOutputDataStream(context.getOutput(transform), outputDataStream); - } - } - - private static class ReshuffleTranslatorStreaming<K, InputT> - extends FlinkStreamingPipelineTranslator.StreamTransformTranslator<Reshuffle<K, InputT>> { - - @Override - public void translateNode( - Reshuffle<K, InputT> transform, - FlinkStreamingTranslationContext context) { - - DataStream<WindowedValue<KV<K, InputT>>> inputDataSet = - context.getInputDataStream(context.getInput(transform)); - - context.setOutputDataStream(context.getOutput(transform), inputDataSet.rebalance()); - - } - } - - - private static class GroupByKeyTranslator<K, InputT> - extends FlinkStreamingPipelineTranslator.StreamTransformTranslator<GroupByKey<K, InputT>> { - - @Override - public void translateNode( - GroupByKey<K, InputT> transform, - FlinkStreamingTranslationContext context) { - - PCollection<KV<K, InputT>> input = context.getInput(transform); - - @SuppressWarnings("unchecked") - WindowingStrategy<?, BoundedWindow> windowingStrategy = - (WindowingStrategy<?, BoundedWindow>) input.getWindowingStrategy(); - - KvCoder<K, InputT> inputKvCoder = (KvCoder<K, InputT>) input.getCoder(); - - SingletonKeyedWorkItemCoder<K, InputT> workItemCoder = SingletonKeyedWorkItemCoder.of( - inputKvCoder.getKeyCoder(), - inputKvCoder.getValueCoder(), - input.getWindowingStrategy().getWindowFn().windowCoder()); - - DataStream<WindowedValue<KV<K, InputT>>> inputDataStream = context.getInputDataStream(input); - - WindowedValue. - FullWindowedValueCoder<SingletonKeyedWorkItem<K, InputT>> windowedWorkItemCoder = - WindowedValue.getFullCoder( - workItemCoder, - input.getWindowingStrategy().getWindowFn().windowCoder()); - - CoderTypeInformation<WindowedValue<SingletonKeyedWorkItem<K, InputT>>> workItemTypeInfo = - new CoderTypeInformation<>(windowedWorkItemCoder); - - DataStream<WindowedValue<SingletonKeyedWorkItem<K, InputT>>> workItemStream = - inputDataStream - .flatMap(new CombinePerKeyTranslator.ToKeyedWorkItem<K, InputT>()) - .returns(workItemTypeInfo).name("ToKeyedWorkItem"); - - KeyedStream< - WindowedValue< - SingletonKeyedWorkItem<K, InputT>>, ByteBuffer> keyedWorkItemStream = workItemStream - .keyBy(new WorkItemKeySelector<K, InputT>(inputKvCoder.getKeyCoder())); - - SystemReduceFn<K, InputT, Iterable<InputT>, Iterable<InputT>, BoundedWindow> reduceFn = - SystemReduceFn.buffering(inputKvCoder.getValueCoder()); - - TypeInformation<WindowedValue<KV<K, Iterable<InputT>>>> outputTypeInfo = - context.getTypeInfo(context.getOutput(transform)); - - DoFnOperator.DefaultOutputManagerFactory< - WindowedValue<KV<K, Iterable<InputT>>>> outputManagerFactory = - new DoFnOperator.DefaultOutputManagerFactory<>(); - - WindowDoFnOperator<K, InputT, Iterable<InputT>> doFnOperator = - new WindowDoFnOperator<>( - reduceFn, - (Coder) windowedWorkItemCoder, - new TupleTag<KV<K, Iterable<InputT>>>("main output"), - Collections.<TupleTag<?>>emptyList(), - outputManagerFactory, - windowingStrategy, - new HashMap<Integer, PCollectionView<?>>(), /* side-input mapping */ - Collections.<PCollectionView<?>>emptyList(), /* side inputs */ - context.getPipelineOptions(), - inputKvCoder.getKeyCoder()); - - // our operator excepts WindowedValue<KeyedWorkItem> while our input stream - // is WindowedValue<SingletonKeyedWorkItem>, which is fine but Java doesn't like it ... - @SuppressWarnings("unchecked") - SingleOutputStreamOperator<WindowedValue<KV<K, Iterable<InputT>>>> outDataStream = - keyedWorkItemStream - .transform( - transform.getName(), - outputTypeInfo, - (OneInputStreamOperator) doFnOperator); - - context.setOutputDataStream(context.getOutput(transform), outDataStream); - - } - } - - private static class CombinePerKeyTranslator<K, InputT, OutputT> - extends FlinkStreamingPipelineTranslator.StreamTransformTranslator< - Combine.PerKey<K, InputT, OutputT>> { - - @Override - boolean canTranslate( - Combine.PerKey<K, InputT, OutputT> transform, - FlinkStreamingTranslationContext context) { - - // if we have a merging window strategy and side inputs we cannot - // translate as a proper combine. We have to group and then run the combine - // over the final grouped values. - PCollection<KV<K, InputT>> input = context.getInput(transform); - - @SuppressWarnings("unchecked") - WindowingStrategy<?, BoundedWindow> windowingStrategy = - (WindowingStrategy<?, BoundedWindow>) input.getWindowingStrategy(); - - return windowingStrategy.getWindowFn().isNonMerging() || transform.getSideInputs().isEmpty(); - } - - @Override - public void translateNode( - Combine.PerKey<K, InputT, OutputT> transform, - FlinkStreamingTranslationContext context) { - - PCollection<KV<K, InputT>> input = context.getInput(transform); - - @SuppressWarnings("unchecked") - WindowingStrategy<?, BoundedWindow> windowingStrategy = - (WindowingStrategy<?, BoundedWindow>) input.getWindowingStrategy(); - - KvCoder<K, InputT> inputKvCoder = (KvCoder<K, InputT>) input.getCoder(); - - SingletonKeyedWorkItemCoder<K, InputT> workItemCoder = SingletonKeyedWorkItemCoder.of( - inputKvCoder.getKeyCoder(), - inputKvCoder.getValueCoder(), - input.getWindowingStrategy().getWindowFn().windowCoder()); - - DataStream<WindowedValue<KV<K, InputT>>> inputDataStream = context.getInputDataStream(input); - - WindowedValue. - FullWindowedValueCoder<SingletonKeyedWorkItem<K, InputT>> windowedWorkItemCoder = - WindowedValue.getFullCoder( - workItemCoder, - input.getWindowingStrategy().getWindowFn().windowCoder()); - - CoderTypeInformation<WindowedValue<SingletonKeyedWorkItem<K, InputT>>> workItemTypeInfo = - new CoderTypeInformation<>(windowedWorkItemCoder); - - DataStream<WindowedValue<SingletonKeyedWorkItem<K, InputT>>> workItemStream = - inputDataStream - .flatMap(new ToKeyedWorkItem<K, InputT>()) - .returns(workItemTypeInfo).name("ToKeyedWorkItem"); - - KeyedStream< - WindowedValue< - SingletonKeyedWorkItem<K, InputT>>, ByteBuffer> keyedWorkItemStream = workItemStream - .keyBy(new WorkItemKeySelector<K, InputT>(inputKvCoder.getKeyCoder())); - - SystemReduceFn<K, InputT, ?, OutputT, BoundedWindow> reduceFn = SystemReduceFn.combining( - inputKvCoder.getKeyCoder(), - AppliedCombineFn.withInputCoder( - transform.getFn(), input.getPipeline().getCoderRegistry(), inputKvCoder)); - - TypeInformation<WindowedValue<KV<K, OutputT>>> outputTypeInfo = - context.getTypeInfo(context.getOutput(transform)); - - List<PCollectionView<?>> sideInputs = transform.getSideInputs(); - - if (sideInputs.isEmpty()) { - - WindowDoFnOperator<K, InputT, OutputT> doFnOperator = - new WindowDoFnOperator<>( - reduceFn, - (Coder) windowedWorkItemCoder, - new TupleTag<KV<K, OutputT>>("main output"), - Collections.<TupleTag<?>>emptyList(), - new DoFnOperator.DefaultOutputManagerFactory<WindowedValue<KV<K, OutputT>>>(), - windowingStrategy, - new HashMap<Integer, PCollectionView<?>>(), /* side-input mapping */ - Collections.<PCollectionView<?>>emptyList(), /* side inputs */ - context.getPipelineOptions(), - inputKvCoder.getKeyCoder()); - - // our operator excepts WindowedValue<KeyedWorkItem> while our input stream - // is WindowedValue<SingletonKeyedWorkItem>, which is fine but Java doesn't like it ... - @SuppressWarnings("unchecked") - SingleOutputStreamOperator<WindowedValue<KV<K, OutputT>>> outDataStream = - keyedWorkItemStream.transform( - transform.getName(), outputTypeInfo, (OneInputStreamOperator) doFnOperator); - - context.setOutputDataStream(context.getOutput(transform), outDataStream); - } else { - Tuple2<Map<Integer, PCollectionView<?>>, DataStream<RawUnionValue>> transformSideInputs = - transformSideInputs(sideInputs, context); - - WindowDoFnOperator<K, InputT, OutputT> doFnOperator = - new WindowDoFnOperator<>( - reduceFn, - (Coder) windowedWorkItemCoder, - new TupleTag<KV<K, OutputT>>("main output"), - Collections.<TupleTag<?>>emptyList(), - new DoFnOperator.DefaultOutputManagerFactory<WindowedValue<KV<K, OutputT>>>(), - windowingStrategy, - transformSideInputs.f0, - sideInputs, - context.getPipelineOptions(), - inputKvCoder.getKeyCoder()); - - // we have to manually contruct the two-input transform because we're not - // allowed to have only one input keyed, normally. - - TwoInputTransformation< - WindowedValue<SingletonKeyedWorkItem<K, InputT>>, - RawUnionValue, - WindowedValue<KV<K, OutputT>>> rawFlinkTransform = new TwoInputTransformation<>( - keyedWorkItemStream.getTransformation(), - transformSideInputs.f1.broadcast().getTransformation(), - transform.getName(), - (TwoInputStreamOperator) doFnOperator, - outputTypeInfo, - keyedWorkItemStream.getParallelism()); - - rawFlinkTransform.setStateKeyType(keyedWorkItemStream.getKeyType()); - rawFlinkTransform.setStateKeySelectors(keyedWorkItemStream.getKeySelector(), null); - - @SuppressWarnings({ "unchecked", "rawtypes" }) - SingleOutputStreamOperator<WindowedValue<KV<K, OutputT>>> outDataStream = - new SingleOutputStreamOperator( - keyedWorkItemStream.getExecutionEnvironment(), - rawFlinkTransform) {}; // we have to cheat around the ctor being protected - - keyedWorkItemStream.getExecutionEnvironment().addOperator(rawFlinkTransform); - - context.setOutputDataStream(context.getOutput(transform), outDataStream); - } - } - - private static class ToKeyedWorkItem<K, InputT> - extends RichFlatMapFunction< - WindowedValue<KV<K, InputT>>, - WindowedValue<SingletonKeyedWorkItem<K, InputT>>> { - - @Override - public void flatMap( - WindowedValue<KV<K, InputT>> inWithMultipleWindows, - Collector<WindowedValue<SingletonKeyedWorkItem<K, InputT>>> out) throws Exception { - - // we need to wrap each one work item per window for now - // since otherwise the PushbackSideInputRunner will not correctly - // determine whether side inputs are ready - for (WindowedValue<KV<K, InputT>> in : inWithMultipleWindows.explodeWindows()) { - SingletonKeyedWorkItem<K, InputT> workItem = - new SingletonKeyedWorkItem<>( - in.getValue().getKey(), - in.withValue(in.getValue().getValue())); - - in.withValue(workItem); - out.collect(in.withValue(workItem)); - } - } - } - } - - private static class FlattenPCollectionTranslator<T> - extends FlinkStreamingPipelineTranslator.StreamTransformTranslator< - Flatten.FlattenPCollectionList<T>> { - - @Override - public void translateNode( - Flatten.FlattenPCollectionList<T> transform, - FlinkStreamingTranslationContext context) { - List<TaggedPValue> allInputs = context.getInputs(transform); - - if (allInputs.isEmpty()) { - - // create an empty dummy source to satisfy downstream operations - // we cannot create an empty source in Flink, therefore we have to - // add the flatMap that simply never forwards the single element - DataStreamSource<String> dummySource = - context.getExecutionEnvironment().fromElements("dummy"); - - DataStream<WindowedValue<T>> result = dummySource.flatMap( - new FlatMapFunction<String, WindowedValue<T>>() { - @Override - public void flatMap( - String s, - Collector<WindowedValue<T>> collector) throws Exception { - // never return anything - } - }).returns( - new CoderTypeInformation<>( - WindowedValue.getFullCoder( - (Coder<T>) VoidCoder.of(), - GlobalWindow.Coder.INSTANCE))); - context.setOutputDataStream(context.getOutput(transform), result); - - } else { - DataStream<T> result = null; - for (TaggedPValue input : allInputs) { - DataStream<T> current = context.getInputDataStream(input.getValue()); - result = (result == null) ? current : result.union(current); - } - context.setOutputDataStream(context.getOutput(transform), result); - } - } - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/e0e7e52b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTranslationContext.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTranslationContext.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTranslationContext.java deleted file mode 100644 index 03698d5..0000000 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTranslationContext.java +++ /dev/null @@ -1,130 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.flink.translation; - -import static com.google.common.base.Preconditions.checkNotNull; - -import com.google.common.collect.Iterables; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import org.apache.beam.runners.flink.translation.types.CoderTypeInformation; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.transforms.AppliedPTransform; -import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.util.WindowedValue; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PInput; -import org.apache.beam.sdk.values.POutput; -import org.apache.beam.sdk.values.PValue; -import org.apache.beam.sdk.values.TaggedPValue; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; - -/** - * Helper for keeping track of which {@link DataStream DataStreams} map - * to which {@link PTransform PTransforms}. - */ -public class FlinkStreamingTranslationContext { - - private final StreamExecutionEnvironment env; - private final PipelineOptions options; - - /** - * Keeps a mapping between the output value of the PTransform (in Dataflow) and the - * Flink Operator that produced it, after the translation of the correspondinf PTransform - * to its Flink equivalent. - * */ - private final Map<PValue, DataStream<?>> dataStreams; - - private AppliedPTransform<?, ?, ?> currentTransform; - - public FlinkStreamingTranslationContext(StreamExecutionEnvironment env, PipelineOptions options) { - this.env = checkNotNull(env); - this.options = checkNotNull(options); - this.dataStreams = new HashMap<>(); - } - - public StreamExecutionEnvironment getExecutionEnvironment() { - return env; - } - - public PipelineOptions getPipelineOptions() { - return options; - } - - @SuppressWarnings("unchecked") - public <T> DataStream<T> getInputDataStream(PValue value) { - return (DataStream<T>) dataStreams.get(value); - } - - public void setOutputDataStream(PValue value, DataStream<?> set) { - if (!dataStreams.containsKey(value)) { - dataStreams.put(value, set); - } - } - - /** - * Sets the AppliedPTransform which carries input/output. - * @param currentTransform - */ - public void setCurrentTransform(AppliedPTransform<?, ?, ?> currentTransform) { - this.currentTransform = currentTransform; - } - - public <T> Coder<WindowedValue<T>> getCoder(PCollection<T> collection) { - Coder<T> valueCoder = collection.getCoder(); - - return WindowedValue.getFullCoder( - valueCoder, - collection.getWindowingStrategy().getWindowFn().windowCoder()); - } - - @SuppressWarnings("unchecked") - public <T> TypeInformation<WindowedValue<T>> getTypeInfo(PCollection<T> collection) { - Coder<T> valueCoder = collection.getCoder(); - WindowedValue.FullWindowedValueCoder<T> windowedValueCoder = - WindowedValue.getFullCoder( - valueCoder, - collection.getWindowingStrategy().getWindowFn().windowCoder()); - - return new CoderTypeInformation<>(windowedValueCoder); - } - - - @SuppressWarnings("unchecked") - public <T extends PValue> T getInput(PTransform<T, ?> transform) { - return (T) Iterables.getOnlyElement(currentTransform.getInputs()).getValue(); - } - - public <T extends PInput> List<TaggedPValue> getInputs(PTransform<T, ?> transform) { - return currentTransform.getInputs(); - } - - @SuppressWarnings("unchecked") - public <T extends PValue> T getOutput(PTransform<?, T> transform) { - return (T) Iterables.getOnlyElement(currentTransform.getOutputs()).getValue(); - } - - public <OutputT extends POutput> List<TaggedPValue> getOutputs(PTransform<?, OutputT> transform) { - return currentTransform.getOutputs(); - } - -} http://git-wip-us.apache.org/repos/asf/beam/blob/e0e7e52b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/PipelineTranslationOptimizer.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/PipelineTranslationOptimizer.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/PipelineTranslationOptimizer.java deleted file mode 100644 index 99f7ceb..0000000 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/PipelineTranslationOptimizer.java +++ /dev/null @@ -1,73 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.flink.translation; - -import org.apache.beam.runners.flink.FlinkPipelineOptions; -import org.apache.beam.sdk.io.Read; -import org.apache.beam.sdk.runners.TransformHierarchy; -import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.values.PValue; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Traverses the Pipeline to determine the {@link TranslationMode} for this pipeline. - */ -public class PipelineTranslationOptimizer extends FlinkPipelineTranslator { - - private static final Logger LOG = LoggerFactory.getLogger(PipelineTranslationOptimizer.class); - - private TranslationMode translationMode; - - private final FlinkPipelineOptions options; - - public PipelineTranslationOptimizer(TranslationMode defaultMode, FlinkPipelineOptions options) { - this.translationMode = defaultMode; - this.options = options; - } - - public TranslationMode getTranslationMode() { - - // override user-specified translation mode - if (options.isStreaming()) { - return TranslationMode.STREAMING; - } - - return translationMode; - } - - @Override - public CompositeBehavior enterCompositeTransform(TransformHierarchy.Node node) { - return CompositeBehavior.ENTER_TRANSFORM; - } - - @Override - public void leaveCompositeTransform(TransformHierarchy.Node node) {} - - @Override - public void visitPrimitiveTransform(TransformHierarchy.Node node) { - Class<? extends PTransform> transformClass = node.getTransform().getClass(); - if (transformClass == Read.Unbounded.class) { - LOG.info("Found {}. Switching to streaming execution.", transformClass); - translationMode = TranslationMode.STREAMING; - } - } - - @Override - public void visitValue(PValue value, TransformHierarchy.Node producer) {} -} http://git-wip-us.apache.org/repos/asf/beam/blob/e0e7e52b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/TranslationMode.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/TranslationMode.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/TranslationMode.java deleted file mode 100644 index 57b69aa..0000000 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/TranslationMode.java +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.flink.translation; - -/** - * The translation mode of the Beam Pipeline. - */ -public enum TranslationMode { - - /** Uses the batch mode of Flink. */ - BATCH, - - /** Uses the streaming mode of Flink. */ - STREAMING - -}
