Fix Checkstyle Errors in FlinkStreamingTransformTranslators
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/ff34f9e8 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/ff34f9e8 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/ff34f9e8 Branch: refs/heads/master Commit: ff34f9e81867a656e4d9dd0987063c58cbb1de88 Parents: 1de76b7 Author: Aljoscha Krettek <aljoscha.kret...@gmail.com> Authored: Sun Jul 10 17:01:38 2016 +0200 Committer: Kenneth Knowles <k...@google.com> Committed: Wed Aug 24 12:46:24 2016 -0700 ---------------------------------------------------------------------- .../FlinkStreamingTransformTranslators.java | 155 ++++++++++++------- 1 file changed, 102 insertions(+), 53 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ff34f9e8/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java index fff629c..8167623 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java @@ -106,7 +106,9 @@ public class FlinkStreamingTransformTranslators { // -------------------------------------------------------------------------------------------- @SuppressWarnings("rawtypes") - private static final Map<Class<? extends PTransform>, FlinkStreamingPipelineTranslator.StreamTransformTranslator> TRANSLATORS = new HashMap<>(); + private static final Map< + Class<? extends PTransform>, + FlinkStreamingPipelineTranslator.StreamTransformTranslator> TRANSLATORS = new HashMap<>(); // here you can find all the available translators. static { @@ -125,7 +127,8 @@ public class FlinkStreamingTransformTranslators { TRANSLATORS.put(ParDo.BoundMulti.class, new ParDoBoundMultiStreamingTranslator()); } - public static FlinkStreamingPipelineTranslator.StreamTransformTranslator<?> getTranslator(PTransform<?, ?> transform) { + public static FlinkStreamingPipelineTranslator.StreamTransformTranslator<?> getTranslator( + PTransform<?, ?> transform) { return TRANSLATORS.get(transform.getClass()); } @@ -133,21 +136,24 @@ public class FlinkStreamingTransformTranslators { // Transformation Implementations // -------------------------------------------------------------------------------------------- - private static class CreateStreamingTranslator<OUT> implements - FlinkStreamingPipelineTranslator.StreamTransformTranslator<Create.Values<OUT>> { + private static class CreateStreamingTranslator<OutputT> implements + FlinkStreamingPipelineTranslator.StreamTransformTranslator<Create.Values<OutputT>> { @Override - public void translateNode(Create.Values<OUT> transform, FlinkStreamingTranslationContext context) { - PCollection<OUT> output = context.getOutput(transform); - Iterable<OUT> elements = transform.getElements(); + public void translateNode( + Create.Values<OutputT> transform, + FlinkStreamingTranslationContext context) { + + PCollection<OutputT> output = context.getOutput(transform); + Iterable<OutputT> elements = transform.getElements(); // we need to serialize the elements to byte arrays, since they might contain // elements that are not serializable by Java serialization. We deserialize them // in the FlatMap function using the Coder. List<byte[]> serializedElements = Lists.newArrayList(); - Coder<OUT> elementCoder = output.getCoder(); - for (OUT element: elements) { + Coder<OutputT> elementCoder = output.getCoder(); + for (OutputT element: elements) { ByteArrayOutputStream bao = new ByteArrayOutputStream(); try { elementCoder.encode(element, bao, Coder.Context.OUTER); @@ -160,25 +166,33 @@ public class FlinkStreamingTransformTranslators { DataStream<Integer> initDataSet = context.getExecutionEnvironment().fromElements(1); - FlinkStreamingCreateFunction<Integer, OUT> createFunction = + FlinkStreamingCreateFunction<Integer, OutputT> createFunction = new FlinkStreamingCreateFunction<>(serializedElements, elementCoder); - WindowedValue.ValueOnlyWindowedValueCoder<OUT> windowCoder = WindowedValue.getValueOnlyCoder(elementCoder); - TypeInformation<WindowedValue<OUT>> outputType = new CoderTypeInformation<>(windowCoder); + WindowedValue.ValueOnlyWindowedValueCoder<OutputT> windowCoder = + WindowedValue.getValueOnlyCoder(elementCoder); + + TypeInformation<WindowedValue<OutputT>> outputType = new CoderTypeInformation<>(windowCoder); - DataStream<WindowedValue<OUT>> outputDataStream = initDataSet.flatMap(createFunction) - .returns(outputType); + DataStream<WindowedValue<OutputT>> outputDataStream = initDataSet + .flatMap(createFunction).returns(outputType); context.setOutputDataStream(output, outputDataStream); } } - private static class TextIOWriteBoundStreamingTranslator<T> implements FlinkStreamingPipelineTranslator.StreamTransformTranslator<TextIO.Write.Bound<T>> { - private static final Logger LOG = LoggerFactory.getLogger(TextIOWriteBoundStreamingTranslator.class); + private static class TextIOWriteBoundStreamingTranslator<T> + implements FlinkStreamingPipelineTranslator.StreamTransformTranslator< + TextIO.Write.Bound<T>> { + + private static final Logger LOG = + LoggerFactory.getLogger(TextIOWriteBoundStreamingTranslator.class); @Override - public void translateNode(TextIO.Write.Bound<T> transform, FlinkStreamingTranslationContext context) { + public void translateNode( + TextIO.Write.Bound<T> transform, + FlinkStreamingTranslationContext context) { PValue input = context.getInput(transform); DataStream<WindowedValue<T>> inputDataStream = context.getInputDataStream(input); @@ -189,17 +203,25 @@ public class FlinkStreamingTransformTranslators { String shardNameTemplate = transform.getShardNameTemplate(); // TODO: Implement these. We need Flink support for this. - LOG.warn("Translation of TextIO.Write.needsValidation not yet supported. Is: {}.", needsValidation); - LOG.warn("Translation of TextIO.Write.filenameSuffix not yet supported. Is: {}.", filenameSuffix); - LOG.warn("Translation of TextIO.Write.shardNameTemplate not yet supported. Is: {}.", shardNameTemplate); - - DataStream<String> dataSink = inputDataStream.flatMap(new FlatMapFunction<WindowedValue<T>, String>() { - @Override - public void flatMap(WindowedValue<T> value, Collector<String> out) throws Exception { - out.collect(value.getValue().toString()); - } - }); - DataStreamSink<String> output = dataSink.writeAsText(filenamePrefix, FileSystem.WriteMode.OVERWRITE); + LOG.warn( + "Translation of TextIO.Write.needsValidation not yet supported. Is: {}.", + needsValidation); + LOG.warn( + "Translation of TextIO.Write.filenameSuffix not yet supported. Is: {}.", + filenameSuffix); + LOG.warn( + "Translation of TextIO.Write.shardNameTemplate not yet supported. Is: {}.", + shardNameTemplate); + + DataStream<String> dataSink = inputDataStream + .flatMap(new FlatMapFunction<WindowedValue<T>, String>() { + @Override + public void flatMap(WindowedValue<T> value, Collector<String> out) throws Exception { + out.collect(value.getValue().toString()); + } + }); + DataStreamSink<String> output = + dataSink.writeAsText(filenamePrefix, FileSystem.WriteMode.OVERWRITE); if (numShards > 0) { output.setParallelism(numShards); @@ -207,7 +229,8 @@ public class FlinkStreamingTransformTranslators { } } - private static class WriteSinkStreamingTranslator<T> implements FlinkStreamingPipelineTranslator.StreamTransformTranslator<Write.Bound<T>> { + private static class WriteSinkStreamingTranslator<T> + implements FlinkStreamingPipelineTranslator.StreamTransformTranslator<Write.Bound<T>> { @Override public void translateNode(Write.Bound<T> transform, FlinkStreamingTranslationContext context) { @@ -216,7 +239,8 @@ public class FlinkStreamingTransformTranslators { Sink<T> sink = transform.getSink(); if (!(sink instanceof UnboundedFlinkSink)) { - throw new UnsupportedOperationException("At the time, only unbounded Flink sinks are supported."); + throw new UnsupportedOperationException( + "At the time, only unbounded Flink sinks are supported."); } DataStream<WindowedValue<T>> inputDataSet = context.getInputDataStream(input); @@ -251,16 +275,21 @@ public class FlinkStreamingTransformTranslators { } } - private static class UnboundedReadSourceTranslator<T> implements FlinkStreamingPipelineTranslator.StreamTransformTranslator<Read.Unbounded<T>> { + private static class UnboundedReadSourceTranslator<T> + implements FlinkStreamingPipelineTranslator.StreamTransformTranslator<Read.Unbounded<T>> { @Override - public void translateNode(Read.Unbounded<T> transform, FlinkStreamingTranslationContext context) { + public void translateNode( + Read.Unbounded<T> transform, + FlinkStreamingTranslationContext context) { PCollection<T> output = context.getOutput(transform); DataStream<WindowedValue<T>> source; if (transform.getSource().getClass().equals(UnboundedFlinkSource.class)) { @SuppressWarnings("unchecked") - UnboundedFlinkSource<T> flinkSourceFunction = (UnboundedFlinkSource<T>) transform.getSource(); + UnboundedFlinkSource<T> flinkSourceFunction = + (UnboundedFlinkSource<T>) transform.getSource(); + final AssignerWithPeriodicWatermarks<T> flinkAssigner = flinkSourceFunction.getFlinkTimestampAssigner(); DataStream<T> flinkSource = context.getExecutionEnvironment() @@ -290,9 +319,12 @@ public class FlinkStreamingTransformTranslators { context.getPipelineOptions(), transform.getSource(), context.getExecutionEnvironment().getParallelism()); - source = context.getExecutionEnvironment().addSource(sourceWrapper).name(transform.getName()); + source = context + .getExecutionEnvironment() + .addSource(sourceWrapper).name(transform.getName()); } catch (Exception e) { - throw new RuntimeException("Error while translating UnboundedSource: " + transform.getSource(), e); + throw new RuntimeException( + "Error while translating UnboundedSource: " + transform.getSource(), e); } } @@ -300,33 +332,41 @@ public class FlinkStreamingTransformTranslators { } } - private static class ParDoBoundStreamingTranslator<IN, OUT> implements FlinkStreamingPipelineTranslator.StreamTransformTranslator<ParDo.Bound<IN, OUT>> { + private static class ParDoBoundStreamingTranslator<InputT, OutputT> + implements FlinkStreamingPipelineTranslator.StreamTransformTranslator< + ParDo.Bound<InputT, OutputT>> { @Override - public void translateNode(ParDo.Bound<IN, OUT> transform, FlinkStreamingTranslationContext context) { + public void translateNode( + ParDo.Bound<InputT, OutputT> transform, + FlinkStreamingTranslationContext context) { - WindowingStrategy<?, ?> windowingStrategy = context.getOutput(transform).getWindowingStrategy(); + WindowingStrategy<?, ?> windowingStrategy = + context.getOutput(transform).getWindowingStrategy(); - TypeInformation<WindowedValue<OUT>> typeInfo = context.getTypeInfo(context.getOutput(transform)); + TypeInformation<WindowedValue<OutputT>> typeInfo = + context.getTypeInfo(context.getOutput(transform)); - DoFnOperator<IN, OUT, WindowedValue<OUT>> doFnOperator = new DoFnOperator<>( + DoFnOperator<InputT, OutputT, WindowedValue<OutputT>> doFnOperator = new DoFnOperator<>( transform.getFn(), - new TupleTag<OUT>("main output"), + new TupleTag<OutputT>("main output"), Collections.<TupleTag<?>>emptyList(), - new DoFnOperator.DefaultOutputManagerFactory<WindowedValue<OUT>>(), + new DoFnOperator.DefaultOutputManagerFactory<WindowedValue<OutputT>>(), windowingStrategy, new HashMap<PCollectionView<?>, WindowingStrategy<?, ?>>(), context.getPipelineOptions()); - DataStream<WindowedValue<IN>> inputDataStream = context.getInputDataStream(context.getInput(transform)); - SingleOutputStreamOperator<WindowedValue<OUT>> outDataStream = inputDataStream - .transform(transform.getName(), typeInfo, doFnOperator); + DataStream<WindowedValue<InputT>> inputDataStream = + context.getInputDataStream(context.getInput(transform)); + + SingleOutputStreamOperator<WindowedValue<OutputT>> outDataStream = + inputDataStream.transform(transform.getName(), typeInfo, doFnOperator); context.setOutputDataStream(context.getOutput(transform), outDataStream); } } - public static class WindowBoundTranslator<T> + private static class WindowBoundTranslator<T> implements FlinkStreamingPipelineTranslator.StreamTransformTranslator<Window.Bound<T>> { @Override @@ -393,11 +433,13 @@ public class FlinkStreamingTransformTranslators { } } - public static class GroupByKeyTranslator<K, InputT> + private static class GroupByKeyTranslator<K, InputT> implements FlinkStreamingPipelineTranslator.StreamTransformTranslator<GroupByKey<K, InputT>> { @Override - public void translateNode(GroupByKey<K, InputT> transform, FlinkStreamingTranslationContext context) { + public void translateNode( + GroupByKey<K, InputT> transform, + FlinkStreamingTranslationContext context) { PCollection<KV<K, InputT>> input = context.getInput(transform); @@ -471,7 +513,7 @@ public class FlinkStreamingTransformTranslators { } } - public static class CombinePerKeyTranslator<K, InputT, OutputT> + private static class CombinePerKeyTranslator<K, InputT, OutputT> implements FlinkStreamingPipelineTranslator.StreamTransformTranslator< Combine.PerKey<K, InputT, OutputT>> { @@ -566,10 +608,14 @@ public class FlinkStreamingTransformTranslators { } } - public static class FlattenPCollectionTranslator<T> implements FlinkStreamingPipelineTranslator.StreamTransformTranslator<Flatten.FlattenPCollectionList<T>> { + private static class FlattenPCollectionTranslator<T> + implements FlinkStreamingPipelineTranslator.StreamTransformTranslator< + Flatten.FlattenPCollectionList<T>> { @Override - public void translateNode(Flatten.FlattenPCollectionList<T> transform, FlinkStreamingTranslationContext context) { + public void translateNode( + Flatten.FlattenPCollectionList<T> transform, + FlinkStreamingTranslationContext context) { List<PCollection<T>> allInputs = context.getInput(transform).getAll(); DataStream<T> result = null; for (PCollection<T> collection : allInputs) { @@ -580,7 +626,7 @@ public class FlinkStreamingTransformTranslators { } } - public static class ParDoBoundMultiStreamingTranslator<InputT, OutputT> + private static class ParDoBoundMultiStreamingTranslator<InputT, OutputT> implements FlinkStreamingPipelineTranslator.StreamTransformTranslator< ParDo.BoundMulti<InputT, OutputT>> { @@ -635,7 +681,10 @@ public class FlinkStreamingTransformTranslators { } } - private Map<TupleTag<?>, Integer> transformTupleTagsToLabels(TupleTag<?> mainTag, Set<TupleTag<?>> secondaryTags) { + private Map<TupleTag<?>, Integer> transformTupleTagsToLabels( + TupleTag<?> mainTag, + Set<TupleTag<?>> secondaryTags) { + Map<TupleTag<?>, Integer> tagToLabelMap = Maps.newHashMap(); int count = 0; tagToLabelMap.put(mainTag, count++);