[BEAM-270] Support Timestamps/Windows in Flink Batch With this change we always use WindowedValue<T> for the underlying Flink DataSets instead of just T. This allows us to support windowing as well.
This changes also a lot of other stuff enabled by the above: - Use WindowedValue throughout - Add proper translation for Window.into() - Make side inputs window aware - Make GroupByKey and Combine transformations window aware, this includes support for merging windows. GroupByKey is implemented as a Combine with a concatenating CombineFn, for simplicity This removes Flink specific transformations for things that are handled by builtin sources/sinks, among other things this: - Removes special translation for AvroIO.Read/Write and TextIO.Read/Write - Removes special support for Write.Bound, this was not working properly and is now handled by the Beam machinery that uses DoFns for this - Removes special translation for binary Co-Group, the code was still in there but was never used - Removes ConsoleIO, this can be done using a DoFn With this change all RunnableOnService tests run on Flink Batch. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/24bfca23 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/24bfca23 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/24bfca23 Branch: refs/heads/master Commit: 24bfca230d5db3cb75dd0e30093a10f7523c1238 Parents: 4e60a49 Author: Aljoscha Krettek <[email protected]> Authored: Tue May 10 13:53:03 2016 +0200 Committer: Aljoscha Krettek <[email protected]> Committed: Fri May 20 08:08:24 2016 +0200 ---------------------------------------------------------------------- runners/flink/runner/pom.xml | 10 - .../apache/beam/runners/flink/io/ConsoleIO.java | 82 -- .../FlinkBatchPipelineTranslator.java | 4 +- .../FlinkBatchTransformTranslators.java | 846 ++++++++++++------- .../FlinkBatchTranslationContext.java | 56 +- .../FlinkStreamingTransformTranslators.java | 22 +- .../FlinkStreamingTranslationContext.java | 29 +- .../functions/FlinkAssignContext.java | 56 ++ .../functions/FlinkAssignWindows.java | 51 ++ .../FlinkCoGroupKeyedListAggregator.java | 61 -- .../functions/FlinkCreateFunction.java | 63 -- .../functions/FlinkDoFnFunction.java | 194 ++--- .../FlinkKeyedListAggregationFunction.java | 78 -- .../FlinkMergingNonShuffleReduceFunction.java | 238 ++++++ .../FlinkMergingPartialReduceFunction.java | 205 +++++ .../functions/FlinkMergingReduceFunction.java | 207 +++++ .../functions/FlinkMultiOutputDoFnFunction.java | 157 ++-- .../FlinkMultiOutputProcessContext.java | 176 ++++ .../FlinkMultiOutputPruningFunction.java | 25 +- .../functions/FlinkNoElementAssignContext.java | 71 ++ .../functions/FlinkPartialReduceFunction.java | 171 +++- .../functions/FlinkProcessContext.java | 324 +++++++ .../functions/FlinkReduceFunction.java | 174 +++- .../functions/SideInputInitializer.java | 75 ++ .../flink/translation/functions/UnionCoder.java | 152 ---- .../translation/types/CoderTypeInformation.java | 21 +- .../translation/types/CoderTypeSerializer.java | 14 +- .../translation/types/KvCoderComperator.java | 102 ++- .../types/KvCoderTypeInformation.java | 63 +- .../types/VoidCoderTypeSerializer.java | 112 --- .../wrappers/CombineFnAggregatorWrapper.java | 94 --- .../SerializableFnAggregatorWrapper.java | 31 +- .../translation/wrappers/SinkOutputFormat.java | 10 +- .../translation/wrappers/SourceInputFormat.java | 18 +- .../streaming/FlinkGroupByKeyWrapper.java | 10 +- .../io/FlinkStreamingCreateFunction.java | 9 +- .../apache/beam/runners/flink/AvroITCase.java | 129 --- .../beam/runners/flink/FlattenizeITCase.java | 76 -- .../beam/runners/flink/JoinExamplesITCase.java | 102 --- .../runners/flink/MaybeEmptyTestITCase.java | 66 -- .../runners/flink/ParDoMultiOutputITCase.java | 102 --- .../beam/runners/flink/ReadSourceITCase.java | 14 +- .../flink/RemoveDuplicatesEmptyITCase.java | 72 -- .../runners/flink/RemoveDuplicatesITCase.java | 73 -- .../beam/runners/flink/SideInputITCase.java | 70 -- .../apache/beam/runners/flink/TfIdfITCase.java | 80 -- .../beam/runners/flink/WordCountITCase.java | 77 -- .../runners/flink/WordCountJoin2ITCase.java | 140 --- .../runners/flink/WordCountJoin3ITCase.java | 158 ---- .../flink/streaming/GroupAlsoByWindowTest.java | 3 +- .../beam/runners/flink/util/JoinExamples.java | 161 ---- .../beam/sdk/transforms/join/UnionCoder.java | 2 +- 52 files changed, 2605 insertions(+), 2731 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/24bfca23/runners/flink/runner/pom.xml ---------------------------------------------------------------------- diff --git a/runners/flink/runner/pom.xml b/runners/flink/runner/pom.xml index fda27a8..b29a5bf 100644 --- a/runners/flink/runner/pom.xml +++ b/runners/flink/runner/pom.xml @@ -191,16 +191,6 @@ ] </beamTestPipelineOptions> </systemPropertyVariables> - <excludes> - <!-- Tests that use unsupported windowing --> - <exclude>**/org/apache/beam/sdk/transforms/CombineTest.java</exclude> - <exclude>**/org/apache/beam/sdk/transforms/GroupByKeyTest.java</exclude> - <exclude>**/org/apache/beam/sdk/transforms/ViewTest.java</exclude> - <exclude>**/org/apache/beam/sdk/transforms/join/CoGroupByKeyTest.java</exclude> - <exclude>**/org/apache/beam/sdk/transforms/windowing/WindowTest.java</exclude> - <exclude>**/org/apache/beam/sdk/transforms/windowing/WindowingTest.java</exclude> - <exclude>**/org/apache/beam/sdk/util/ReshuffleTest.java</exclude> - </excludes> </configuration> </execution> <execution> http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/24bfca23/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/io/ConsoleIO.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/io/ConsoleIO.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/io/ConsoleIO.java deleted file mode 100644 index 9c36c21..0000000 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/io/ConsoleIO.java +++ /dev/null @@ -1,82 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.flink.io; - -import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PDone; - -/** - * Transform for printing the contents of a {@link org.apache.beam.sdk.values.PCollection}. - * to standard output. - * - * This is Flink-specific and will only work when executed using the - * {@link org.apache.beam.runners.flink.FlinkPipelineRunner}. - */ -public class ConsoleIO { - - /** - * A PTransform that writes a PCollection to a standard output. - */ - public static class Write { - - /** - * Returns a ConsoleIO.Write PTransform with a default step name. - */ - public static Bound create() { - return new Bound(); - } - - /** - * Returns a ConsoleIO.Write PTransform with the given step name. - */ - public static Bound named(String name) { - return new Bound().named(name); - } - - /** - * A PTransform that writes a bounded PCollection to standard output. - */ - public static class Bound extends PTransform<PCollection<?>, PDone> { - private static final long serialVersionUID = 0; - - Bound() { - super("ConsoleIO.Write"); - } - - Bound(String name) { - super(name); - } - - /** - * Returns a new ConsoleIO.Write PTransform that's like this one but with the given - * step - * name. Does not modify this object. - */ - public Bound named(String name) { - return new Bound(name); - } - - @Override - public PDone apply(PCollection<?> input) { - return PDone.in(input.getPipeline()); - } - } - } -} - http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/24bfca23/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchPipelineTranslator.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchPipelineTranslator.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchPipelineTranslator.java index 512b822..69c02a2 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchPipelineTranslator.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchPipelineTranslator.java @@ -32,8 +32,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * FlinkBatchPipelineTranslator knows how to translate Pipeline objects into Flink Jobs. - * This is based on {@link org.apache.beam.runners.dataflow.DataflowPipelineTranslator} + * {@link Pipeline.PipelineVisitor} for executing a {@link Pipeline} as a + * Flink batch job. */ public class FlinkBatchPipelineTranslator extends FlinkPipelineTranslator { http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/24bfca23/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTransformTranslators.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTransformTranslators.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTransformTranslators.java index 07785aa..8358807 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTransformTranslators.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTransformTranslators.java @@ -17,23 +17,24 @@ */ package org.apache.beam.runners.flink.translation; -import org.apache.beam.runners.flink.io.ConsoleIO; -import org.apache.beam.runners.flink.translation.functions.FlinkCoGroupKeyedListAggregator; -import org.apache.beam.runners.flink.translation.functions.FlinkCreateFunction; +import org.apache.beam.runners.flink.translation.functions.FlinkAssignWindows; import org.apache.beam.runners.flink.translation.functions.FlinkDoFnFunction; -import org.apache.beam.runners.flink.translation.functions.FlinkKeyedListAggregationFunction; +import org.apache.beam.runners.flink.translation.functions.FlinkMergingNonShuffleReduceFunction; +import org.apache.beam.runners.flink.translation.functions.FlinkMergingPartialReduceFunction; +import org.apache.beam.runners.flink.translation.functions.FlinkMergingReduceFunction; import org.apache.beam.runners.flink.translation.functions.FlinkMultiOutputDoFnFunction; import org.apache.beam.runners.flink.translation.functions.FlinkMultiOutputPruningFunction; import org.apache.beam.runners.flink.translation.functions.FlinkPartialReduceFunction; import org.apache.beam.runners.flink.translation.functions.FlinkReduceFunction; -import org.apache.beam.runners.flink.translation.functions.UnionCoder; import org.apache.beam.runners.flink.translation.types.CoderTypeInformation; import org.apache.beam.runners.flink.translation.types.KvCoderTypeInformation; import org.apache.beam.runners.flink.translation.wrappers.SinkOutputFormat; import org.apache.beam.runners.flink.translation.wrappers.SourceInputFormat; import org.apache.beam.sdk.coders.CannotProvideCoderException; import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderRegistry; import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.coders.ListCoder; import org.apache.beam.sdk.coders.VoidCoder; import org.apache.beam.sdk.io.AvroIO; import org.apache.beam.sdk.io.BoundedSource; @@ -41,60 +42,63 @@ import org.apache.beam.sdk.io.Read; import org.apache.beam.sdk.io.TextIO; import org.apache.beam.sdk.io.Write; import org.apache.beam.sdk.transforms.Combine; -import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.CombineFnBase; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.Flatten; import org.apache.beam.sdk.transforms.GroupByKey; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.View; -import org.apache.beam.sdk.transforms.join.CoGbkResult; -import org.apache.beam.sdk.transforms.join.CoGbkResultSchema; -import org.apache.beam.sdk.transforms.join.CoGroupByKey; -import org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple; import org.apache.beam.sdk.transforms.join.RawUnionValue; +import org.apache.beam.sdk.transforms.join.UnionCoder; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.GlobalWindow; +import org.apache.beam.sdk.transforms.windowing.IntervalWindow; +import org.apache.beam.sdk.transforms.windowing.Window; +import org.apache.beam.sdk.transforms.windowing.WindowFn; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.PValue; import org.apache.beam.sdk.values.TupleTag; -import com.google.api.client.util.Maps; import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import org.apache.flink.api.common.functions.FilterFunction; import org.apache.flink.api.common.functions.FlatMapFunction; -import org.apache.flink.api.common.functions.GroupReduceFunction; +import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.operators.Keys; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.io.AvroInputFormat; import org.apache.flink.api.java.io.AvroOutputFormat; -import org.apache.flink.api.java.io.TextInputFormat; -import org.apache.flink.api.java.operators.CoGroupOperator; import org.apache.flink.api.java.operators.DataSink; import org.apache.flink.api.java.operators.DataSource; import org.apache.flink.api.java.operators.FlatMapOperator; import org.apache.flink.api.java.operators.GroupCombineOperator; import org.apache.flink.api.java.operators.GroupReduceOperator; import org.apache.flink.api.java.operators.Grouping; +import org.apache.flink.api.java.operators.MapOperator; import org.apache.flink.api.java.operators.MapPartitionOperator; +import org.apache.flink.api.java.operators.SingleInputUdfOperator; import org.apache.flink.api.java.operators.UnsortedGrouping; import org.apache.flink.core.fs.Path; import org.apache.flink.util.Collector; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.ByteArrayOutputStream; -import java.io.IOException; import java.lang.reflect.Field; +import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; /** - * Translators for transforming - * Dataflow {@link org.apache.beam.sdk.transforms.PTransform}s to - * Flink {@link org.apache.flink.api.java.DataSet}s. + * Translators for transforming {@link PTransform PTransforms} to + * Flink {@link DataSet DataSets}. */ public class FlinkBatchTransformTranslators { @@ -103,113 +107,90 @@ public class FlinkBatchTransformTranslators { // -------------------------------------------------------------------------------------------- @SuppressWarnings("rawtypes") - private static final Map<Class<? extends PTransform>, FlinkBatchPipelineTranslator.BatchTransformTranslator> TRANSLATORS = new HashMap<>(); + private static final Map< + Class<? extends PTransform>, + FlinkBatchPipelineTranslator.BatchTransformTranslator> TRANSLATORS = new HashMap<>(); - // register the known translators static { TRANSLATORS.put(View.CreatePCollectionView.class, new CreatePCollectionViewTranslatorBatch()); TRANSLATORS.put(Combine.PerKey.class, new CombinePerKeyTranslatorBatch()); - // we don't need this because we translate the Combine.PerKey directly - //TRANSLATORS.put(Combine.GroupedValues.class, new CombineGroupedValuesTranslator()); - - TRANSLATORS.put(Create.Values.class, new CreateTranslatorBatch()); + TRANSLATORS.put(GroupByKey.class, new GroupByKeyTranslatorBatch()); TRANSLATORS.put(Flatten.FlattenPCollectionList.class, new FlattenPCollectionTranslatorBatch()); - // TODO we're currently ignoring windows here but that has to change in the future - TRANSLATORS.put(GroupByKey.class, new GroupByKeyTranslatorBatch()); + TRANSLATORS.put(Window.Bound.class, new WindowBoundTranslatorBatch()); - TRANSLATORS.put(ParDo.BoundMulti.class, new ParDoBoundMultiTranslatorBatch()); TRANSLATORS.put(ParDo.Bound.class, new ParDoBoundTranslatorBatch()); - - TRANSLATORS.put(CoGroupByKey.class, new CoGroupByKeyTranslatorBatch()); - - TRANSLATORS.put(AvroIO.Read.Bound.class, new AvroIOReadTranslatorBatch()); - TRANSLATORS.put(AvroIO.Write.Bound.class, new AvroIOWriteTranslatorBatch()); + TRANSLATORS.put(ParDo.BoundMulti.class, new ParDoBoundMultiTranslatorBatch()); TRANSLATORS.put(Read.Bounded.class, new ReadSourceTranslatorBatch()); - TRANSLATORS.put(Write.Bound.class, new WriteSinkTranslatorBatch()); - - TRANSLATORS.put(TextIO.Read.Bound.class, new TextIOReadTranslatorBatch()); - TRANSLATORS.put(TextIO.Write.Bound.class, new TextIOWriteTranslatorBatch()); - - // Flink-specific - TRANSLATORS.put(ConsoleIO.Write.Bound.class, new ConsoleIOWriteTranslatorBatch()); - } - public static FlinkBatchPipelineTranslator.BatchTransformTranslator<?> getTranslator(PTransform<?, ?> transform) { + public static FlinkBatchPipelineTranslator.BatchTransformTranslator<?> getTranslator( + PTransform<?, ?> transform) { return TRANSLATORS.get(transform.getClass()); } - private static class ReadSourceTranslatorBatch<T> implements FlinkBatchPipelineTranslator.BatchTransformTranslator<Read.Bounded<T>> { + private static class ReadSourceTranslatorBatch<T> + implements FlinkBatchPipelineTranslator.BatchTransformTranslator<Read.Bounded<T>> { @Override public void translateNode(Read.Bounded<T> transform, FlinkBatchTranslationContext context) { String name = transform.getName(); BoundedSource<T> source = transform.getSource(); PCollection<T> output = context.getOutput(transform); - Coder<T> coder = output.getCoder(); - TypeInformation<T> typeInformation = context.getTypeInfo(output); + TypeInformation<WindowedValue<T>> typeInformation = context.getTypeInfo(output); - DataSource<T> dataSource = new DataSource<>(context.getExecutionEnvironment(), - new SourceInputFormat<>(source, context.getPipelineOptions()), typeInformation, name); + DataSource<WindowedValue<T>> dataSource = new DataSource<>( + context.getExecutionEnvironment(), + new SourceInputFormat<>(source, context.getPipelineOptions()), + typeInformation, + name); context.setOutputDataSet(output, dataSource); } } - private static class AvroIOReadTranslatorBatch<T> implements FlinkBatchPipelineTranslator.BatchTransformTranslator<AvroIO.Read.Bound<T>> { - private static final Logger LOG = LoggerFactory.getLogger(AvroIOReadTranslatorBatch.class); + private static class WriteSinkTranslatorBatch<T> + implements FlinkBatchPipelineTranslator.BatchTransformTranslator<Write.Bound<T>> { @Override - public void translateNode(AvroIO.Read.Bound<T> transform, FlinkBatchTranslationContext context) { - String path = transform.getFilepattern(); + public void translateNode(Write.Bound<T> transform, FlinkBatchTranslationContext context) { String name = transform.getName(); -// Schema schema = transform.getSchema(); - PValue output = context.getOutput(transform); - - TypeInformation<T> typeInformation = context.getTypeInfo(output); - - // This is super hacky, but unfortunately we cannot get the type otherwise - Class<T> extractedAvroType; - try { - Field typeField = transform.getClass().getDeclaredField("type"); - typeField.setAccessible(true); - @SuppressWarnings("unchecked") - Class<T> avroType = (Class<T>) typeField.get(transform); - extractedAvroType = avroType; - } catch (NoSuchFieldException | IllegalAccessException e) { - // we know that the field is there and it is accessible - throw new RuntimeException("Could not access type from AvroIO.Bound", e); - } - - DataSource<T> source = new DataSource<>(context.getExecutionEnvironment(), - new AvroInputFormat<>(new Path(path), extractedAvroType), - typeInformation, name); + PValue input = context.getInput(transform); + DataSet<WindowedValue<T>> inputDataSet = context.getInputDataSet(input); - context.setOutputDataSet(output, source); + inputDataSet.output(new SinkOutputFormat<>(transform, context.getPipelineOptions())) + .name(name); } } - private static class AvroIOWriteTranslatorBatch<T> implements FlinkBatchPipelineTranslator.BatchTransformTranslator<AvroIO.Write.Bound<T>> { + private static class AvroIOWriteTranslatorBatch<T> implements + FlinkBatchPipelineTranslator.BatchTransformTranslator<AvroIO.Write.Bound<T>> { private static final Logger LOG = LoggerFactory.getLogger(AvroIOWriteTranslatorBatch.class); + @Override - public void translateNode(AvroIO.Write.Bound<T> transform, FlinkBatchTranslationContext context) { - DataSet<T> inputDataSet = context.getInputDataSet(context.getInput(transform)); + public void translateNode( + AvroIO.Write.Bound<T> transform, + FlinkBatchTranslationContext context) { + DataSet<WindowedValue<T>> inputDataSet = context.getInputDataSet(context.getInput(transform)); + String filenamePrefix = transform.getFilenamePrefix(); String filenameSuffix = transform.getFilenameSuffix(); int numShards = transform.getNumShards(); String shardNameTemplate = transform.getShardNameTemplate(); // TODO: Implement these. We need Flink support for this. - LOG.warn("Translation of TextIO.Write.filenameSuffix not yet supported. Is: {}.", + LOG.warn( + "Translation of TextIO.Write.filenameSuffix not yet supported. Is: {}.", filenameSuffix); - LOG.warn("Translation of TextIO.Write.shardNameTemplate not yet supported. Is: {}.", shardNameTemplate); + LOG.warn( + "Translation of TextIO.Write.shardNameTemplate not yet supported. Is: {}.", + shardNameTemplate); // This is super hacky, but unfortunately we cannot get the type otherwise Class<T> extractedAvroType; @@ -224,8 +205,17 @@ public class FlinkBatchTransformTranslators { throw new RuntimeException("Could not access type from AvroIO.Bound", e); } - DataSink<T> dataSink = inputDataSet.output(new AvroOutputFormat<>(new Path - (filenamePrefix), extractedAvroType)); + MapOperator<WindowedValue<T>, T> valueStream = inputDataSet.map( + new MapFunction<WindowedValue<T>, T>() { + @Override + public T map(WindowedValue<T> value) throws Exception { + return value.getValue(); + } + }).returns(new CoderTypeInformation<>(context.getInput(transform).getCoder())); + + + DataSink<T> dataSink = valueStream.output( + new AvroOutputFormat<>(new Path(filenamePrefix), extractedAvroType)); if (numShards > 0) { dataSink.setParallelism(numShards); @@ -233,37 +223,16 @@ public class FlinkBatchTransformTranslators { } } - private static class TextIOReadTranslatorBatch implements FlinkBatchPipelineTranslator.BatchTransformTranslator<TextIO.Read.Bound<String>> { - private static final Logger LOG = LoggerFactory.getLogger(TextIOReadTranslatorBatch.class); - - @Override - public void translateNode(TextIO.Read.Bound<String> transform, FlinkBatchTranslationContext context) { - String path = transform.getFilepattern(); - String name = transform.getName(); - - TextIO.CompressionType compressionType = transform.getCompressionType(); - boolean needsValidation = transform.needsValidation(); - - // TODO: Implement these. We need Flink support for this. - LOG.warn("Translation of TextIO.CompressionType not yet supported. Is: {}.", compressionType); - LOG.warn("Translation of TextIO.Read.needsValidation not yet supported. Is: {}.", needsValidation); - - PValue output = context.getOutput(transform); - - TypeInformation<String> typeInformation = context.getTypeInfo(output); - DataSource<String> source = new DataSource<>(context.getExecutionEnvironment(), new TextInputFormat(new Path(path)), typeInformation, name); - - context.setOutputDataSet(output, source); - } - } - - private static class TextIOWriteTranslatorBatch<T> implements FlinkBatchPipelineTranslator.BatchTransformTranslator<TextIO.Write.Bound<T>> { + private static class TextIOWriteTranslatorBatch<T> + implements FlinkBatchPipelineTranslator.BatchTransformTranslator<TextIO.Write.Bound<T>> { private static final Logger LOG = LoggerFactory.getLogger(TextIOWriteTranslatorBatch.class); @Override - public void translateNode(TextIO.Write.Bound<T> transform, FlinkBatchTranslationContext context) { + public void translateNode( + TextIO.Write.Bound<T> transform, + FlinkBatchTranslationContext context) { PValue input = context.getInput(transform); - DataSet<T> inputDataSet = context.getInputDataSet(input); + DataSet<WindowedValue<T>> inputDataSet = context.getInputDataSet(input); String filenamePrefix = transform.getFilenamePrefix(); String filenameSuffix = transform.getFilenameSuffix(); @@ -272,12 +241,25 @@ public class FlinkBatchTransformTranslators { String shardNameTemplate = transform.getShardNameTemplate(); // TODO: Implement these. We need Flink support for this. - LOG.warn("Translation of TextIO.Write.needsValidation not yet supported. Is: {}.", needsValidation); - LOG.warn("Translation of TextIO.Write.filenameSuffix not yet supported. Is: {}.", filenameSuffix); - LOG.warn("Translation of TextIO.Write.shardNameTemplate not yet supported. Is: {}.", shardNameTemplate); + LOG.warn( + "Translation of TextIO.Write.needsValidation not yet supported. Is: {}.", + needsValidation); + LOG.warn( + "Translation of TextIO.Write.filenameSuffix not yet supported. Is: {}.", + filenameSuffix); + LOG.warn( + "Translation of TextIO.Write.shardNameTemplate not yet supported. Is: {}.", + shardNameTemplate); - //inputDataSet.print(); - DataSink<T> dataSink = inputDataSet.writeAsText(filenamePrefix); + MapOperator<WindowedValue<T>, T> valueStream = inputDataSet.map( + new MapFunction<WindowedValue<T>, T>() { + @Override + public T map(WindowedValue<T> value) throws Exception { + return value.getValue(); + } + }).returns(new CoderTypeInformation<>(transform.getCoder())); + + DataSink<T> dataSink = valueStream.writeAsText(filenamePrefix); if (numShards > 0) { dataSink.setParallelism(numShards); @@ -285,148 +267,414 @@ public class FlinkBatchTransformTranslators { } } - private static class ConsoleIOWriteTranslatorBatch implements FlinkBatchPipelineTranslator.BatchTransformTranslator<ConsoleIO.Write.Bound> { + private static class WindowBoundTranslatorBatch<T> + implements FlinkBatchPipelineTranslator.BatchTransformTranslator<Window.Bound<T>> { + @Override - public void translateNode(ConsoleIO.Write.Bound transform, FlinkBatchTranslationContext context) { + public void translateNode(Window.Bound<T> transform, FlinkBatchTranslationContext context) { PValue input = context.getInput(transform); - DataSet<?> inputDataSet = context.getInputDataSet(input); - inputDataSet.printOnTaskManager(transform.getName()); + + TypeInformation<WindowedValue<T>> resultTypeInfo = + context.getTypeInfo(context.getOutput(transform)); + + DataSet<WindowedValue<T>> inputDataSet = context.getInputDataSet(input); + + @SuppressWarnings("unchecked") + final WindowingStrategy<T, ? extends BoundedWindow> windowingStrategy = + (WindowingStrategy<T, ? extends BoundedWindow>) + context.getOutput(transform).getWindowingStrategy(); + + WindowFn<T, ? extends BoundedWindow> windowFn = windowingStrategy.getWindowFn(); + + FlinkAssignWindows<T, ? extends BoundedWindow> assignWindowsFunction = + new FlinkAssignWindows<>(windowFn); + + DataSet<WindowedValue<T>> resultDataSet = inputDataSet + .flatMap(assignWindowsFunction) + .name(context.getOutput(transform).getName()) + .returns(resultTypeInfo); + + context.setOutputDataSet(context.getOutput(transform), resultDataSet); } } - private static class WriteSinkTranslatorBatch<T> implements FlinkBatchPipelineTranslator.BatchTransformTranslator<Write.Bound<T>> { + private static class GroupByKeyTranslatorBatch<K, InputT> + implements FlinkBatchPipelineTranslator.BatchTransformTranslator<GroupByKey<K, InputT>> { @Override - public void translateNode(Write.Bound<T> transform, FlinkBatchTranslationContext context) { - String name = transform.getName(); - PValue input = context.getInput(transform); - DataSet<T> inputDataSet = context.getInputDataSet(input); + public void translateNode( + GroupByKey<K, InputT> transform, + FlinkBatchTranslationContext context) { + + // for now, this is copied from the Combine.PerKey translater. Once we have the new runner API + // we can replace GroupByKey by a Combine.PerKey with the Concatenate CombineFn + + DataSet<WindowedValue<KV<K, InputT>>> inputDataSet = + context.getInputDataSet(context.getInput(transform)); + + Combine.KeyedCombineFn<K, InputT, List<InputT>, List<InputT>> combineFn = + new Concatenate<InputT>().asKeyedFn(); + + KvCoder<K, InputT> inputCoder = (KvCoder<K, InputT>) context.getInput(transform).getCoder(); + + Coder<List<InputT>> accumulatorCoder; + + try { + accumulatorCoder = + combineFn.getAccumulatorCoder( + context.getInput(transform).getPipeline().getCoderRegistry(), + inputCoder.getKeyCoder(), + inputCoder.getValueCoder()); + } catch (CannotProvideCoderException e) { + throw new RuntimeException(e); + } + + WindowingStrategy<?, ?> windowingStrategy = + context.getInput(transform).getWindowingStrategy(); + + TypeInformation<WindowedValue<KV<K, InputT>>> kvCoderTypeInformation = + new KvCoderTypeInformation<>( + WindowedValue.getFullCoder( + inputCoder, + windowingStrategy.getWindowFn().windowCoder())); + + TypeInformation<WindowedValue<KV<K, List<InputT>>>> partialReduceTypeInfo = + new KvCoderTypeInformation<>( + WindowedValue.getFullCoder( + KvCoder.of(inputCoder.getKeyCoder(), accumulatorCoder), + windowingStrategy.getWindowFn().windowCoder())); + + Grouping<WindowedValue<KV<K, InputT>>> inputGrouping = + new UnsortedGrouping<>( + inputDataSet, + new Keys.ExpressionKeys<>(new String[]{"key"}, + kvCoderTypeInformation)); + + FlinkPartialReduceFunction<K, InputT, List<InputT>, ?> partialReduceFunction; + FlinkReduceFunction<K, List<InputT>, List<InputT>, ?> reduceFunction; + + if (windowingStrategy.getWindowFn().isNonMerging()) { + @SuppressWarnings("unchecked") + WindowingStrategy<?, BoundedWindow> boundedStrategy = + (WindowingStrategy<?, BoundedWindow>) windowingStrategy; + + partialReduceFunction = new FlinkPartialReduceFunction<>( + combineFn, + boundedStrategy, + Collections.<PCollectionView<?>, WindowingStrategy<?, ?>>emptyMap(), + context.getPipelineOptions()); + + reduceFunction = new FlinkReduceFunction<>( + combineFn, + boundedStrategy, + Collections.<PCollectionView<?>, WindowingStrategy<?, ?>>emptyMap(), + context.getPipelineOptions()); + + } else { + if (!windowingStrategy.getWindowFn().windowCoder().equals(IntervalWindow.getCoder())) { + throw new UnsupportedOperationException( + "Merging WindowFn with windows other than IntervalWindow are not supported."); + } + + @SuppressWarnings("unchecked") + WindowingStrategy<?, IntervalWindow> intervalStrategy = + (WindowingStrategy<?, IntervalWindow>) windowingStrategy; + + partialReduceFunction = new FlinkMergingPartialReduceFunction<>( + combineFn, + intervalStrategy, + Collections.<PCollectionView<?>, WindowingStrategy<?, ?>>emptyMap(), + context.getPipelineOptions()); + + reduceFunction = new FlinkMergingReduceFunction<>( + combineFn, + intervalStrategy, + Collections.<PCollectionView<?>, WindowingStrategy<?, ?>>emptyMap(), + context.getPipelineOptions()); + } + + // Partially GroupReduce the values into the intermediate format AccumT (combine) + GroupCombineOperator< + WindowedValue<KV<K, InputT>>, + WindowedValue<KV<K, List<InputT>>>> groupCombine = + new GroupCombineOperator<>( + inputGrouping, + partialReduceTypeInfo, + partialReduceFunction, + "GroupCombine: " + transform.getName()); + + Grouping<WindowedValue<KV<K, List<InputT>>>> intermediateGrouping = + new UnsortedGrouping<>( + groupCombine, new Keys.ExpressionKeys<>(new String[]{"key"}, groupCombine.getType())); + + // Fully reduce the values and create output format VO + GroupReduceOperator< + WindowedValue<KV<K, List<InputT>>>, WindowedValue<KV<K, List<InputT>>>> outputDataSet = + new GroupReduceOperator<>( + intermediateGrouping, partialReduceTypeInfo, reduceFunction, transform.getName()); + + context.setOutputDataSet(context.getOutput(transform), outputDataSet); - inputDataSet.output(new SinkOutputFormat<>(transform, context.getPipelineOptions())).name(name); } } /** - * Translates a GroupByKey while ignoring window assignments. Current ignores windows. + * Combiner that combines {@code T}s into a single {@code List<T>} containing all inputs. + * + * <p>For internal use to translate {@link GroupByKey}. For a large {@link PCollection} this + * is expected to crash! + * + * <p>This is copied from the dataflow runner code. + * + * @param <T> the type of elements to concatenate. */ - private static class GroupByKeyTranslatorBatch<K, V> implements FlinkBatchPipelineTranslator.BatchTransformTranslator<GroupByKey<K, V>> { + private static class Concatenate<T> extends Combine.CombineFn<T, List<T>, List<T>> { + @Override + public List<T> createAccumulator() { + return new ArrayList<T>(); + } @Override - public void translateNode(GroupByKey<K, V> transform, FlinkBatchTranslationContext context) { - DataSet<KV<K, V>> inputDataSet = context.getInputDataSet(context.getInput(transform)); - GroupReduceFunction<KV<K, V>, KV<K, Iterable<V>>> groupReduceFunction = new FlinkKeyedListAggregationFunction<>(); + public List<T> addInput(List<T> accumulator, T input) { + accumulator.add(input); + return accumulator; + } - TypeInformation<KV<K, Iterable<V>>> typeInformation = context.getTypeInfo(context.getOutput(transform)); + @Override + public List<T> mergeAccumulators(Iterable<List<T>> accumulators) { + List<T> result = createAccumulator(); + for (List<T> accumulator : accumulators) { + result.addAll(accumulator); + } + return result; + } - Grouping<KV<K, V>> grouping = new UnsortedGrouping<>(inputDataSet, new Keys.ExpressionKeys<>(new String[]{"key"}, inputDataSet.getType())); + @Override + public List<T> extractOutput(List<T> accumulator) { + return accumulator; + } - GroupReduceOperator<KV<K, V>, KV<K, Iterable<V>>> outputDataSet = - new GroupReduceOperator<>(grouping, typeInformation, groupReduceFunction, transform.getName()); + @Override + public Coder<List<T>> getAccumulatorCoder(CoderRegistry registry, Coder<T> inputCoder) { + return ListCoder.of(inputCoder); + } - context.setOutputDataSet(context.getOutput(transform), outputDataSet); + @Override + public Coder<List<T>> getDefaultOutputCoder(CoderRegistry registry, Coder<T> inputCoder) { + return ListCoder.of(inputCoder); } } - private static class CombinePerKeyTranslatorBatch<K, VI, VA, VO> implements FlinkBatchPipelineTranslator.BatchTransformTranslator<Combine.PerKey<K, VI, VO>> { + + private static class CombinePerKeyTranslatorBatch<K, InputT, AccumT, OutputT> + implements FlinkBatchPipelineTranslator.BatchTransformTranslator< + Combine.PerKey<K, InputT, OutputT>> { @Override - public void translateNode(Combine.PerKey<K, VI, VO> transform, FlinkBatchTranslationContext context) { - DataSet<KV<K, VI>> inputDataSet = context.getInputDataSet(context.getInput(transform)); + @SuppressWarnings("unchecked") + public void translateNode( + Combine.PerKey<K, InputT, OutputT> transform, + FlinkBatchTranslationContext context) { + DataSet<WindowedValue<KV<K, InputT>>> inputDataSet = + context.getInputDataSet(context.getInput(transform)); - @SuppressWarnings("unchecked") - Combine.KeyedCombineFn<K, VI, VA, VO> keyedCombineFn = (Combine.KeyedCombineFn<K, VI, VA, VO>) transform.getFn(); + CombineFnBase.PerKeyCombineFn<K, InputT, AccumT, OutputT> combineFn = + (CombineFnBase.PerKeyCombineFn<K, InputT, AccumT, OutputT>) transform.getFn(); + + KvCoder<K, InputT> inputCoder = (KvCoder<K, InputT>) context.getInput(transform).getCoder(); - KvCoder<K, VI> inputCoder = (KvCoder<K, VI>) context.getInput(transform).getCoder(); + Coder<AccumT> accumulatorCoder; - Coder<VA> accumulatorCoder = - null; try { - accumulatorCoder = keyedCombineFn.getAccumulatorCoder(context.getInput(transform).getPipeline().getCoderRegistry(), inputCoder.getKeyCoder(), inputCoder.getValueCoder()); + accumulatorCoder = + combineFn.getAccumulatorCoder( + context.getInput(transform).getPipeline().getCoderRegistry(), + inputCoder.getKeyCoder(), + inputCoder.getValueCoder()); } catch (CannotProvideCoderException e) { - e.printStackTrace(); - // TODO + throw new RuntimeException(e); } - TypeInformation<KV<K, VI>> kvCoderTypeInformation = new KvCoderTypeInformation<>(inputCoder); - TypeInformation<KV<K, VA>> partialReduceTypeInfo = new KvCoderTypeInformation<>(KvCoder.of(inputCoder.getKeyCoder(), accumulatorCoder)); + WindowingStrategy<?, ?> windowingStrategy = + context.getInput(transform).getWindowingStrategy(); + + TypeInformation<WindowedValue<KV<K, InputT>>> kvCoderTypeInformation = + new KvCoderTypeInformation<>( + WindowedValue.getFullCoder( + inputCoder, + windowingStrategy.getWindowFn().windowCoder())); + + TypeInformation<WindowedValue<KV<K, AccumT>>> partialReduceTypeInfo = + new KvCoderTypeInformation<>( + WindowedValue.getFullCoder( + KvCoder.of(inputCoder.getKeyCoder(), accumulatorCoder), + windowingStrategy.getWindowFn().windowCoder())); + + Grouping<WindowedValue<KV<K, InputT>>> inputGrouping = + new UnsortedGrouping<>( + inputDataSet, + new Keys.ExpressionKeys<>(new String[]{"key"}, + kvCoderTypeInformation)); + + // construct a map from side input to WindowingStrategy so that + // the DoFn runner can map main-input windows to side input windows + Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputStrategies = new HashMap<>(); + for (PCollectionView<?> sideInput: transform.getSideInputs()) { + sideInputStrategies.put(sideInput, sideInput.getWindowingStrategyInternal()); + } - Grouping<KV<K, VI>> inputGrouping = new UnsortedGrouping<>(inputDataSet, new Keys.ExpressionKeys<>(new String[]{"key"}, kvCoderTypeInformation)); + if (windowingStrategy.getWindowFn().isNonMerging()) { + WindowingStrategy<?, BoundedWindow> boundedStrategy = + (WindowingStrategy<?, BoundedWindow>) windowingStrategy; + + FlinkPartialReduceFunction<K, InputT, AccumT, ?> partialReduceFunction = + new FlinkPartialReduceFunction<>( + combineFn, + boundedStrategy, + sideInputStrategies, + context.getPipelineOptions()); + + FlinkReduceFunction<K, AccumT, OutputT, ?> reduceFunction = + new FlinkReduceFunction<>( + combineFn, + boundedStrategy, + sideInputStrategies, + context.getPipelineOptions()); + + // Partially GroupReduce the values into the intermediate format AccumT (combine) + GroupCombineOperator< + WindowedValue<KV<K, InputT>>, + WindowedValue<KV<K, AccumT>>> groupCombine = + new GroupCombineOperator<>( + inputGrouping, + partialReduceTypeInfo, + partialReduceFunction, + "GroupCombine: " + transform.getName()); + + transformSideInputs(transform.getSideInputs(), groupCombine, context); + + TypeInformation<WindowedValue<KV<K, OutputT>>> reduceTypeInfo = + context.getTypeInfo(context.getOutput(transform)); + + Grouping<WindowedValue<KV<K, AccumT>>> intermediateGrouping = + new UnsortedGrouping<>( + groupCombine, + new Keys.ExpressionKeys<>(new String[]{"key"}, groupCombine.getType())); + + // Fully reduce the values and create output format OutputT + GroupReduceOperator< + WindowedValue<KV<K, AccumT>>, WindowedValue<KV<K, OutputT>>> outputDataSet = + new GroupReduceOperator<>( + intermediateGrouping, reduceTypeInfo, reduceFunction, transform.getName()); + + transformSideInputs(transform.getSideInputs(), outputDataSet, context); + + context.setOutputDataSet(context.getOutput(transform), outputDataSet); - FlinkPartialReduceFunction<K, VI, VA> partialReduceFunction = new FlinkPartialReduceFunction<>(keyedCombineFn); + } else { + if (!windowingStrategy.getWindowFn().windowCoder().equals(IntervalWindow.getCoder())) { + throw new UnsupportedOperationException( + "Merging WindowFn with windows other than IntervalWindow are not supported."); + } - // Partially GroupReduce the values into the intermediate format VA (combine) - GroupCombineOperator<KV<K, VI>, KV<K, VA>> groupCombine = - new GroupCombineOperator<>(inputGrouping, partialReduceTypeInfo, partialReduceFunction, - "GroupCombine: " + transform.getName()); + // for merging windows we can't to a pre-shuffle combine step since + // elements would not be in their correct windows for side-input access - // Reduce fully to VO - GroupReduceFunction<KV<K, VA>, KV<K, VO>> reduceFunction = new FlinkReduceFunction<>(keyedCombineFn); + WindowingStrategy<?, IntervalWindow> intervalStrategy = + (WindowingStrategy<?, IntervalWindow>) windowingStrategy; - TypeInformation<KV<K, VO>> reduceTypeInfo = context.getTypeInfo(context.getOutput(transform)); + FlinkMergingNonShuffleReduceFunction<K, InputT, AccumT, OutputT, ?> reduceFunction = + new FlinkMergingNonShuffleReduceFunction<>( + combineFn, + intervalStrategy, + sideInputStrategies, + context.getPipelineOptions()); - Grouping<KV<K, VA>> intermediateGrouping = new UnsortedGrouping<>(groupCombine, new Keys.ExpressionKeys<>(new String[]{"key"}, groupCombine.getType())); + TypeInformation<WindowedValue<KV<K, OutputT>>> reduceTypeInfo = + context.getTypeInfo(context.getOutput(transform)); + + Grouping<WindowedValue<KV<K, InputT>>> grouping = + new UnsortedGrouping<>( + inputDataSet, + new Keys.ExpressionKeys<>(new String[]{"key"}, kvCoderTypeInformation)); + + // Fully reduce the values and create output format OutputT + GroupReduceOperator< + WindowedValue<KV<K, InputT>>, WindowedValue<KV<K, OutputT>>> outputDataSet = + new GroupReduceOperator<>( + grouping, reduceTypeInfo, reduceFunction, transform.getName()); + + transformSideInputs(transform.getSideInputs(), outputDataSet, context); + + context.setOutputDataSet(context.getOutput(transform), outputDataSet); + } - // Fully reduce the values and create output format VO - GroupReduceOperator<KV<K, VA>, KV<K, VO>> outputDataSet = - new GroupReduceOperator<>(intermediateGrouping, reduceTypeInfo, reduceFunction, transform.getName()); - context.setOutputDataSet(context.getOutput(transform), outputDataSet); } } -// private static class CombineGroupedValuesTranslator<K, VI, VO> implements FlinkPipelineTranslator.TransformTranslator<Combine.GroupedValues<K, VI, VO>> { -// -// @Override -// public void translateNode(Combine.GroupedValues<K, VI, VO> transform, TranslationContext context) { -// DataSet<KV<K, VI>> inputDataSet = context.getInputDataSet(transform.getInput()); -// -// Combine.KeyedCombineFn<? super K, ? super VI, ?, VO> keyedCombineFn = transform.getFn(); -// -// GroupReduceFunction<KV<K, VI>, KV<K, VO>> groupReduceFunction = new FlinkCombineFunction<>(keyedCombineFn); -// -// TypeInformation<KV<K, VO>> typeInformation = context.getTypeInfo(transform.getOutput()); -// -// Grouping<KV<K, VI>> grouping = new UnsortedGrouping<>(inputDataSet, new Keys.ExpressionKeys<>(new String[]{""}, inputDataSet.getType())); -// -// GroupReduceOperator<KV<K, VI>, KV<K, VO>> outputDataSet = -// new GroupReduceOperator<>(grouping, typeInformation, groupReduceFunction, transform.getName()); -// context.setOutputDataSet(transform.getOutput(), outputDataSet); -// } -// } - - private static class ParDoBoundTranslatorBatch<IN, OUT> implements FlinkBatchPipelineTranslator.BatchTransformTranslator<ParDo.Bound<IN, OUT>> { - private static final Logger LOG = LoggerFactory.getLogger(ParDoBoundTranslatorBatch.class); + private static class ParDoBoundTranslatorBatch<InputT, OutputT> + implements FlinkBatchPipelineTranslator.BatchTransformTranslator< + ParDo.Bound<InputT, OutputT>> { @Override - public void translateNode(ParDo.Bound<IN, OUT> transform, FlinkBatchTranslationContext context) { - DataSet<IN> inputDataSet = context.getInputDataSet(context.getInput(transform)); + public void translateNode( + ParDo.Bound<InputT, OutputT> transform, + FlinkBatchTranslationContext context) { + DataSet<WindowedValue<InputT>> inputDataSet = + context.getInputDataSet(context.getInput(transform)); - final DoFn<IN, OUT> doFn = transform.getFn(); + final DoFn<InputT, OutputT> doFn = transform.getFn(); - TypeInformation<OUT> typeInformation = context.getTypeInfo(context.getOutput(transform)); + TypeInformation<WindowedValue<OutputT>> typeInformation = + context.getTypeInfo(context.getOutput(transform)); - FlinkDoFnFunction<IN, OUT> doFnWrapper = new FlinkDoFnFunction<>(doFn, context.getPipelineOptions()); - MapPartitionOperator<IN, OUT> outputDataSet = new MapPartitionOperator<>(inputDataSet, typeInformation, doFnWrapper, transform.getName()); + List<PCollectionView<?>> sideInputs = transform.getSideInputs(); - transformSideInputs(transform.getSideInputs(), outputDataSet, context); + // construct a map from side input to WindowingStrategy so that + // the DoFn runner can map main-input windows to side input windows + Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputStrategies = new HashMap<>(); + for (PCollectionView<?> sideInput: sideInputs) { + sideInputStrategies.put(sideInput, sideInput.getWindowingStrategyInternal()); + } + + FlinkDoFnFunction<InputT, OutputT> doFnWrapper = + new FlinkDoFnFunction<>( + doFn, + context.getOutput(transform).getWindowingStrategy(), + sideInputStrategies, + context.getPipelineOptions()); + + MapPartitionOperator<WindowedValue<InputT>, WindowedValue<OutputT>> outputDataSet = + new MapPartitionOperator<>( + inputDataSet, + typeInformation, + doFnWrapper, + transform.getName()); + + transformSideInputs(sideInputs, outputDataSet, context); context.setOutputDataSet(context.getOutput(transform), outputDataSet); } } - private static class ParDoBoundMultiTranslatorBatch<IN, OUT> implements FlinkBatchPipelineTranslator.BatchTransformTranslator<ParDo.BoundMulti<IN, OUT>> { - private static final Logger LOG = LoggerFactory.getLogger(ParDoBoundMultiTranslatorBatch.class); + private static class ParDoBoundMultiTranslatorBatch<InputT, OutputT> + implements FlinkBatchPipelineTranslator.BatchTransformTranslator< + ParDo.BoundMulti<InputT, OutputT>> { @Override - public void translateNode(ParDo.BoundMulti<IN, OUT> transform, FlinkBatchTranslationContext context) { - DataSet<IN> inputDataSet = context.getInputDataSet(context.getInput(transform)); + public void translateNode( + ParDo.BoundMulti<InputT, OutputT> transform, + FlinkBatchTranslationContext context) { + DataSet<WindowedValue<InputT>> inputDataSet = + context.getInputDataSet(context.getInput(transform)); - final DoFn<IN, OUT> doFn = transform.getFn(); + final DoFn<InputT, OutputT> doFn = transform.getFn(); Map<TupleTag<?>, PCollection<?>> outputs = context.getOutput(transform).getAll(); Map<TupleTag<?>, Integer> outputMap = Maps.newHashMap(); - // put the main output at index 0, FlinkMultiOutputDoFnFunction also expects this + // put the main output at index 0, FlinkMultiOutputDoFnFunction expects this outputMap.put(transform.getMainOutputTag(), 0); int count = 1; for (TupleTag<?> tag: outputs.keySet()) { @@ -435,58 +683,118 @@ public class FlinkBatchTransformTranslators { } } + // assume that the windowing strategy is the same for all outputs + WindowingStrategy<?, ?> windowingStrategy = null; + // collect all output Coders and create a UnionCoder for our tagged outputs List<Coder<?>> outputCoders = Lists.newArrayList(); for (PCollection<?> coll: outputs.values()) { outputCoders.add(coll.getCoder()); + windowingStrategy = coll.getWindowingStrategy(); + } + + if (windowingStrategy == null) { + throw new IllegalStateException("No outputs defined."); } UnionCoder unionCoder = UnionCoder.of(outputCoders); - @SuppressWarnings("unchecked") - TypeInformation<RawUnionValue> typeInformation = new CoderTypeInformation<>(unionCoder); + TypeInformation<WindowedValue<RawUnionValue>> typeInformation = + new CoderTypeInformation<>( + WindowedValue.getFullCoder( + unionCoder, + windowingStrategy.getWindowFn().windowCoder())); - @SuppressWarnings("unchecked") - FlinkMultiOutputDoFnFunction<IN, OUT> doFnWrapper = new FlinkMultiOutputDoFnFunction(doFn, context.getPipelineOptions(), outputMap); - MapPartitionOperator<IN, RawUnionValue> outputDataSet = new MapPartitionOperator<>(inputDataSet, typeInformation, doFnWrapper, transform.getName()); + List<PCollectionView<?>> sideInputs = transform.getSideInputs(); - transformSideInputs(transform.getSideInputs(), outputDataSet, context); + // construct a map from side input to WindowingStrategy so that + // the DoFn runner can map main-input windows to side input windows + Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputStrategies = new HashMap<>(); + for (PCollectionView<?> sideInput: sideInputs) { + sideInputStrategies.put(sideInput, sideInput.getWindowingStrategyInternal()); + } - for (Map.Entry<TupleTag<?>, PCollection<?>> output: outputs.entrySet()) { - TypeInformation<Object> outputType = context.getTypeInfo(output.getValue()); - int outputTag = outputMap.get(output.getKey()); - FlinkMultiOutputPruningFunction<Object> pruningFunction = new FlinkMultiOutputPruningFunction<>(outputTag); - FlatMapOperator<RawUnionValue, Object> pruningOperator = new - FlatMapOperator<>(outputDataSet, outputType, - pruningFunction, output.getValue().getName()); - context.setOutputDataSet(output.getValue(), pruningOperator); + @SuppressWarnings("unchecked") + FlinkMultiOutputDoFnFunction<InputT, OutputT> doFnWrapper = + new FlinkMultiOutputDoFnFunction( + doFn, + windowingStrategy, + sideInputStrategies, + context.getPipelineOptions(), + outputMap); + + MapPartitionOperator<WindowedValue<InputT>, WindowedValue<RawUnionValue>> taggedDataSet = + new MapPartitionOperator<>( + inputDataSet, + typeInformation, + doFnWrapper, + transform.getName()); + + transformSideInputs(sideInputs, taggedDataSet, context); + for (Map.Entry<TupleTag<?>, PCollection<?>> output: outputs.entrySet()) { + pruneOutput( + taggedDataSet, + context, + outputMap.get(output.getKey()), + (PCollection) output.getValue()); } } + + private <T> void pruneOutput( + MapPartitionOperator<WindowedValue<InputT>, WindowedValue<RawUnionValue>> taggedDataSet, + FlinkBatchTranslationContext context, + int integerTag, + PCollection<T> collection) { + TypeInformation<WindowedValue<T>> outputType = context.getTypeInfo(collection); + + FlinkMultiOutputPruningFunction<T> pruningFunction = + new FlinkMultiOutputPruningFunction<>(integerTag); + + FlatMapOperator<WindowedValue<RawUnionValue>, WindowedValue<T>> pruningOperator = + new FlatMapOperator<>( + taggedDataSet, + outputType, + pruningFunction, + collection.getName()); + + context.setOutputDataSet(collection, pruningOperator); + } } - private static class FlattenPCollectionTranslatorBatch<T> implements FlinkBatchPipelineTranslator.BatchTransformTranslator<Flatten.FlattenPCollectionList<T>> { + private static class FlattenPCollectionTranslatorBatch<T> + implements FlinkBatchPipelineTranslator.BatchTransformTranslator< + Flatten.FlattenPCollectionList<T>> { @Override @SuppressWarnings("unchecked") - public void translateNode(Flatten.FlattenPCollectionList<T> transform, FlinkBatchTranslationContext context) { + public void translateNode( + Flatten.FlattenPCollectionList<T> transform, + FlinkBatchTranslationContext context) { + List<PCollection<T>> allInputs = context.getInput(transform).getAll(); - DataSet<T> result = null; + DataSet<WindowedValue<T>> result = null; + if (allInputs.isEmpty()) { + // create an empty dummy source to satisfy downstream operations // we cannot create an empty source in Flink, therefore we have to // add the flatMap that simply never forwards the single element DataSource<String> dummySource = context.getExecutionEnvironment().fromElements("dummy"); - result = dummySource.flatMap(new FlatMapFunction<String, T>() { + result = dummySource.flatMap(new FlatMapFunction<String, WindowedValue<T>>() { @Override - public void flatMap(String s, Collector<T> collector) throws Exception { + public void flatMap(String s, Collector<WindowedValue<T>> collector) throws Exception { // never return anything } - }).returns(new CoderTypeInformation<>((Coder<T>) VoidCoder.of())); + }).returns( + new CoderTypeInformation<>( + WindowedValue.getFullCoder( + (Coder<T>) VoidCoder.of(), + GlobalWindow.Coder.INSTANCE))); } else { for (PCollection<T> collection : allInputs) { - DataSet<T> current = context.getInputDataSet(collection); + DataSet<WindowedValue<T>> current = context.getInputDataSet(collection); if (result == null) { result = current; } else { @@ -494,103 +802,47 @@ public class FlinkBatchTransformTranslators { } } } - context.setOutputDataSet(context.getOutput(transform), result); - } - } - private static class CreatePCollectionViewTranslatorBatch<R, T> implements FlinkBatchPipelineTranslator.BatchTransformTranslator<View.CreatePCollectionView<R, T>> { - @Override - public void translateNode(View.CreatePCollectionView<R, T> transform, FlinkBatchTranslationContext context) { - DataSet<T> inputDataSet = context.getInputDataSet(context.getInput(transform)); - PCollectionView<T> input = transform.apply(null); - context.setSideInputDataSet(input, inputDataSet); + // insert a dummy filter, there seems to be a bug in Flink + // that produces duplicate elements after the union in some cases + // if we don't + result = result.filter(new FilterFunction<WindowedValue<T>>() { + @Override + public boolean filter(WindowedValue<T> tWindowedValue) throws Exception { + return true; + } + }).name("UnionFixFilter"); + context.setOutputDataSet(context.getOutput(transform), result); } } - private static class CreateTranslatorBatch<OUT> implements FlinkBatchPipelineTranslator.BatchTransformTranslator<Create.Values<OUT>> { + private static class CreatePCollectionViewTranslatorBatch<ElemT, ViewT> + implements FlinkBatchPipelineTranslator.BatchTransformTranslator< + View.CreatePCollectionView<ElemT, ViewT>> { @Override - public void translateNode(Create.Values<OUT> transform, FlinkBatchTranslationContext context) { - TypeInformation<OUT> typeInformation = context.getOutputTypeInfo(); - Iterable<OUT> elements = transform.getElements(); - - // we need to serialize the elements to byte arrays, since they might contain - // elements that are not serializable by Java serialization. We deserialize them - // in the FlatMap function using the Coder. - - List<byte[]> serializedElements = Lists.newArrayList(); - Coder<OUT> coder = context.getOutput(transform).getCoder(); - for (OUT element: elements) { - ByteArrayOutputStream bao = new ByteArrayOutputStream(); - try { - coder.encode(element, bao, Coder.Context.OUTER); - serializedElements.add(bao.toByteArray()); - } catch (IOException e) { - throw new RuntimeException("Could not serialize Create elements using Coder: " + e); - } - } + public void translateNode( + View.CreatePCollectionView<ElemT, ViewT> transform, + FlinkBatchTranslationContext context) { + DataSet<WindowedValue<ElemT>> inputDataSet = + context.getInputDataSet(context.getInput(transform)); - DataSet<Integer> initDataSet = context.getExecutionEnvironment().fromElements(1); - FlinkCreateFunction<Integer, OUT> flatMapFunction = new FlinkCreateFunction<>(serializedElements, coder); - FlatMapOperator<Integer, OUT> outputDataSet = new FlatMapOperator<>(initDataSet, typeInformation, flatMapFunction, transform.getName()); + PCollectionView<ViewT> input = transform.getView(); - context.setOutputDataSet(context.getOutput(transform), outputDataSet); + context.setSideInputDataSet(input, inputDataSet); } } - private static void transformSideInputs(List<PCollectionView<?>> sideInputs, - MapPartitionOperator<?, ?> outputDataSet, - FlinkBatchTranslationContext context) { + private static void transformSideInputs( + List<PCollectionView<?>> sideInputs, + SingleInputUdfOperator<?, ?, ?> outputDataSet, + FlinkBatchTranslationContext context) { // get corresponding Flink broadcast DataSets - for(PCollectionView<?> input : sideInputs) { + for (PCollectionView<?> input : sideInputs) { DataSet<?> broadcastSet = context.getSideInputDataSet(input); outputDataSet.withBroadcastSet(broadcastSet, input.getTagInternal().getId()); } } -// Disabled because it depends on a pending pull request to the DataFlowSDK - /** - * Special composite transform translator. Only called if the CoGroup is two dimensional. - * @param <K> - */ - private static class CoGroupByKeyTranslatorBatch<K, V1, V2> implements FlinkBatchPipelineTranslator.BatchTransformTranslator<CoGroupByKey<K>> { - - @Override - public void translateNode(CoGroupByKey<K> transform, FlinkBatchTranslationContext context) { - KeyedPCollectionTuple<K> input = context.getInput(transform); - - CoGbkResultSchema schema = input.getCoGbkResultSchema(); - List<KeyedPCollectionTuple.TaggedKeyedPCollection<K, ?>> keyedCollections = input.getKeyedCollections(); - - KeyedPCollectionTuple.TaggedKeyedPCollection<K, ?> taggedCollection1 = keyedCollections.get(0); - KeyedPCollectionTuple.TaggedKeyedPCollection<K, ?> taggedCollection2 = keyedCollections.get(1); - - TupleTag<?> tupleTag1 = taggedCollection1.getTupleTag(); - TupleTag<?> tupleTag2 = taggedCollection2.getTupleTag(); - - PCollection<? extends KV<K, ?>> collection1 = taggedCollection1.getCollection(); - PCollection<? extends KV<K, ?>> collection2 = taggedCollection2.getCollection(); - - DataSet<KV<K,V1>> inputDataSet1 = context.getInputDataSet(collection1); - DataSet<KV<K,V2>> inputDataSet2 = context.getInputDataSet(collection2); - - TypeInformation<KV<K,CoGbkResult>> typeInfo = context.getOutputTypeInfo(); - - FlinkCoGroupKeyedListAggregator<K,V1,V2> aggregator = new FlinkCoGroupKeyedListAggregator<>(schema, tupleTag1, tupleTag2); - - Keys.ExpressionKeys<KV<K,V1>> keySelector1 = new Keys.ExpressionKeys<>(new String[]{"key"}, inputDataSet1.getType()); - Keys.ExpressionKeys<KV<K,V2>> keySelector2 = new Keys.ExpressionKeys<>(new String[]{"key"}, inputDataSet2.getType()); - - DataSet<KV<K, CoGbkResult>> out = new CoGroupOperator<>(inputDataSet1, inputDataSet2, - keySelector1, keySelector2, - aggregator, typeInfo, null, transform.getName()); - context.setOutputDataSet(context.getOutput(transform), out); - } - } - - // -------------------------------------------------------------------------------------------- - // Miscellaneous - // -------------------------------------------------------------------------------------------- - private FlinkBatchTransformTranslators() {} } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/24bfca23/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTranslationContext.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTranslationContext.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTranslationContext.java index 501b1ea..ecc3a65 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTranslationContext.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTranslationContext.java @@ -18,26 +18,28 @@ package org.apache.beam.runners.flink.translation; import org.apache.beam.runners.flink.translation.types.CoderTypeInformation; -import org.apache.beam.runners.flink.translation.types.KvCoderTypeInformation; import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.AppliedPTransform; import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.PInput; import org.apache.beam.sdk.values.POutput; import org.apache.beam.sdk.values.PValue; -import org.apache.beam.sdk.values.TypedPValue; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.api.java.typeutils.GenericTypeInfo; import java.util.HashMap; import java.util.Map; +/** + * Helper for {@link FlinkBatchPipelineTranslator} and translators in + * {@link FlinkBatchTransformTranslators}. + */ public class FlinkBatchTranslationContext { private final Map<PValue, DataSet<?>> dataSets; @@ -81,13 +83,13 @@ public class FlinkBatchTranslationContext { } @SuppressWarnings("unchecked") - public <T> DataSet<T> getInputDataSet(PValue value) { + public <T> DataSet<WindowedValue<T>> getInputDataSet(PValue value) { // assume that the DataSet is used as an input if retrieved here danglingDataSets.remove(value); - return (DataSet<T>) dataSets.get(value); + return (DataSet<WindowedValue<T>>) dataSets.get(value); } - public void setOutputDataSet(PValue value, DataSet<?> set) { + public <T> void setOutputDataSet(PValue value, DataSet<WindowedValue<T>> set) { if (!dataSets.containsKey(value)) { dataSets.put(value, set); danglingDataSets.put(value, set); @@ -107,40 +109,32 @@ public class FlinkBatchTranslationContext { return (DataSet<T>) broadcastDataSets.get(value); } - public void setSideInputDataSet(PCollectionView<?> value, DataSet<?> set) { + public <ViewT, ElemT> void setSideInputDataSet( + PCollectionView<ViewT> value, + DataSet<WindowedValue<ElemT>> set) { if (!broadcastDataSets.containsKey(value)) { broadcastDataSets.put(value, set); } } - - @SuppressWarnings("unchecked") - public <T> TypeInformation<T> getTypeInfo(PInput output) { - if (output instanceof TypedPValue) { - Coder<?> outputCoder = ((TypedPValue) output).getCoder(); - if (outputCoder instanceof KvCoder) { - return new KvCoderTypeInformation((KvCoder) outputCoder); - } else { - return new CoderTypeInformation(outputCoder); - } - } - return new GenericTypeInfo<>((Class<T>)Object.class); - } - - public <T> TypeInformation<T> getInputTypeInfo() { - return getTypeInfo(currentTransform.getInput()); - } - public <T> TypeInformation<T> getOutputTypeInfo() { - return getTypeInfo((PValue) currentTransform.getOutput()); + @SuppressWarnings("unchecked") + public <T> TypeInformation<WindowedValue<T>> getTypeInfo(PCollection<T> collection) { + Coder<T> valueCoder = collection.getCoder(); + WindowedValue.FullWindowedValueCoder<T> windowedValueCoder = + WindowedValue.getFullCoder( + valueCoder, + collection.getWindowingStrategy().getWindowFn().windowCoder()); + + return new CoderTypeInformation<>(windowedValueCoder); } @SuppressWarnings("unchecked") - <I extends PInput> I getInput(PTransform<I, ?> transform) { - return (I) currentTransform.getInput(); + <T extends PInput> T getInput(PTransform<T, ?> transform) { + return (T) currentTransform.getInput(); } @SuppressWarnings("unchecked") - <O extends POutput> O getOutput(PTransform<?, O> transform) { - return (O) currentTransform.getOutput(); + <T extends POutput> T getOutput(PTransform<?, T> transform) { + return (T) currentTransform.getOutput(); } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/24bfca23/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java index 2778d5c..b3fed99 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java @@ -18,7 +18,6 @@ package org.apache.beam.runners.flink.translation; -import org.apache.beam.runners.flink.translation.functions.UnionCoder; import org.apache.beam.runners.flink.translation.types.CoderTypeInformation; import org.apache.beam.runners.flink.translation.types.FlinkCoder; import org.apache.beam.runners.flink.translation.wrappers.SourceInputFormat; @@ -46,6 +45,7 @@ import org.apache.beam.sdk.transforms.GroupByKey; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.join.RawUnionValue; +import org.apache.beam.sdk.transforms.join.UnionCoder; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; @@ -229,29 +229,15 @@ public class FlinkStreamingTransformTranslators { BoundedSource<T> boundedSource = transform.getSource(); PCollection<T> output = context.getOutput(transform); - Coder<T> defaultOutputCoder = boundedSource.getDefaultOutputCoder(); - CoderTypeInformation<T> typeInfo = new CoderTypeInformation<>(defaultOutputCoder); + TypeInformation<WindowedValue<T>> typeInfo = context.getTypeInfo(output); - DataStream<T> source = context.getExecutionEnvironment().createInput( + DataStream<WindowedValue<T>> source = context.getExecutionEnvironment().createInput( new SourceInputFormat<>( boundedSource, context.getPipelineOptions()), typeInfo); - DataStream<WindowedValue<T>> windowedStream = source.flatMap( - new FlatMapFunction<T, WindowedValue<T>>() { - @Override - public void flatMap(T value, Collector<WindowedValue<T>> out) throws Exception { - out.collect( - WindowedValue.of(value, - Instant.now(), - GlobalWindow.INSTANCE, - PaneInfo.NO_FIRING)); - } - }) - .assignTimestampsAndWatermarks(new IngestionTimeExtractor<WindowedValue<T>>()); - - context.setOutputDataStream(output, windowedStream); + context.setOutputDataStream(output, source); } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/24bfca23/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTranslationContext.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTranslationContext.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTranslationContext.java index 8bc7317..0cb80ba 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTranslationContext.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTranslationContext.java @@ -17,21 +17,30 @@ */ package org.apache.beam.runners.flink.translation; +import org.apache.beam.runners.flink.translation.types.CoderTypeInformation; +import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.AppliedPTransform; import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PInput; import org.apache.beam.sdk.values.POutput; import org.apache.beam.sdk.values.PValue; import com.google.common.base.Preconditions; +import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import java.util.HashMap; import java.util.Map; +/** + * Helper for keeping track of which {@link DataStream DataStreams} map + * to which {@link PTransform PTransforms}. + */ public class FlinkStreamingTranslationContext { private final StreamExecutionEnvironment env; @@ -80,12 +89,24 @@ public class FlinkStreamingTranslationContext { } @SuppressWarnings("unchecked") - public <I extends PInput> I getInput(PTransform<I, ?> transform) { - return (I) currentTransform.getInput(); + public <T> TypeInformation<WindowedValue<T>> getTypeInfo(PCollection<T> collection) { + Coder<T> valueCoder = collection.getCoder(); + WindowedValue.FullWindowedValueCoder<T> windowedValueCoder = + WindowedValue.getFullCoder( + valueCoder, + collection.getWindowingStrategy().getWindowFn().windowCoder()); + + return new CoderTypeInformation<>(windowedValueCoder); + } + + + @SuppressWarnings("unchecked") + public <T extends PInput> T getInput(PTransform<T, ?> transform) { + return (T) currentTransform.getInput(); } @SuppressWarnings("unchecked") - public <O extends POutput> O getOutput(PTransform<?, O> transform) { - return (O) currentTransform.getOutput(); + public <T extends POutput> T getOutput(PTransform<?, T> transform) { + return (T) currentTransform.getOutput(); } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/24bfca23/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkAssignContext.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkAssignContext.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkAssignContext.java new file mode 100644 index 0000000..7ea8c20 --- /dev/null +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkAssignContext.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink.translation.functions; + +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.WindowFn; +import org.apache.beam.sdk.util.WindowedValue; + +import org.joda.time.Instant; + +import java.util.Collection; + +/** + * {@link org.apache.beam.sdk.transforms.windowing.WindowFn.AssignContext} for + * Flink functions. + */ +class FlinkAssignContext<InputT, W extends BoundedWindow> + extends WindowFn<InputT, W>.AssignContext { + private final WindowedValue<InputT> value; + + FlinkAssignContext(WindowFn<InputT, W> fn, WindowedValue<InputT> value) { + fn.super(); + this.value = value; + } + + @Override + public InputT element() { + return value.getValue(); + } + + @Override + public Instant timestamp() { + return value.getTimestamp(); + } + + @Override + public Collection<? extends BoundedWindow> windows() { + return value.getWindows(); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/24bfca23/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkAssignWindows.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkAssignWindows.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkAssignWindows.java new file mode 100644 index 0000000..e07e49a --- /dev/null +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkAssignWindows.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink.translation.functions; + +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.WindowFn; +import org.apache.beam.sdk.util.WindowedValue; + +import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.util.Collector; + +import java.util.Collection; + +/** + * Flink {@link FlatMapFunction} for implementing + * {@link org.apache.beam.sdk.transforms.windowing.Window.Bound}. + */ +public class FlinkAssignWindows<T, W extends BoundedWindow> + implements FlatMapFunction<WindowedValue<T>, WindowedValue<T>> { + + private final WindowFn<T, W> windowFn; + + public FlinkAssignWindows(WindowFn<T, W> windowFn) { + this.windowFn = windowFn; + } + + @Override + public void flatMap( + WindowedValue<T> input, Collector<WindowedValue<T>> collector) throws Exception { + Collection<W> windows = windowFn.assignWindows(new FlinkAssignContext<>(windowFn, input)); + for (W window: windows) { + collector.collect( + WindowedValue.of(input.getValue(), input.getTimestamp(), window, input.getPane())); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/24bfca23/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkCoGroupKeyedListAggregator.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkCoGroupKeyedListAggregator.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkCoGroupKeyedListAggregator.java deleted file mode 100644 index 8e7cdd7..0000000 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkCoGroupKeyedListAggregator.java +++ /dev/null @@ -1,61 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.flink.translation.functions; - -import org.apache.beam.sdk.transforms.join.CoGbkResult; -import org.apache.beam.sdk.transforms.join.CoGbkResultSchema; -import org.apache.beam.sdk.transforms.join.RawUnionValue; -import org.apache.beam.sdk.values.KV; -import org.apache.beam.sdk.values.TupleTag; - -import org.apache.flink.api.common.functions.CoGroupFunction; -import org.apache.flink.util.Collector; - -import java.util.ArrayList; -import java.util.List; - - -public class FlinkCoGroupKeyedListAggregator<K,V1,V2> implements CoGroupFunction<KV<K,V1>, KV<K,V2>, KV<K, CoGbkResult>>{ - - private CoGbkResultSchema schema; - private TupleTag<?> tupleTag1; - private TupleTag<?> tupleTag2; - - public FlinkCoGroupKeyedListAggregator(CoGbkResultSchema schema, TupleTag<?> tupleTag1, TupleTag<?> tupleTag2) { - this.schema = schema; - this.tupleTag1 = tupleTag1; - this.tupleTag2 = tupleTag2; - } - - @Override - public void coGroup(Iterable<KV<K,V1>> first, Iterable<KV<K,V2>> second, Collector<KV<K, CoGbkResult>> out) throws Exception { - K k = null; - List<RawUnionValue> result = new ArrayList<>(); - int index1 = schema.getIndex(tupleTag1); - for (KV<K,?> entry : first) { - k = entry.getKey(); - result.add(new RawUnionValue(index1, entry.getValue())); - } - int index2 = schema.getIndex(tupleTag2); - for (KV<K,?> entry : second) { - k = entry.getKey(); - result.add(new RawUnionValue(index2, entry.getValue())); - } - out.collect(KV.of(k, new CoGbkResult(schema, result))); - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/24bfca23/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkCreateFunction.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkCreateFunction.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkCreateFunction.java deleted file mode 100644 index e5ac748..0000000 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkCreateFunction.java +++ /dev/null @@ -1,63 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.flink.translation.functions; - -import org.apache.beam.runners.flink.translation.types.VoidCoderTypeSerializer; -import org.apache.beam.sdk.coders.Coder; - -import org.apache.flink.api.common.functions.FlatMapFunction; -import org.apache.flink.util.Collector; - -import java.io.ByteArrayInputStream; -import java.util.List; - -/** - * This is a hack for transforming a {@link org.apache.beam.sdk.transforms.Create} - * operation. Flink does not allow {@code null} in it's equivalent operation: - * {@link org.apache.flink.api.java.ExecutionEnvironment#fromElements(Object[])}. Therefore - * we use a DataSource with one dummy element and output the elements of the Create operation - * inside this FlatMap. - */ -public class FlinkCreateFunction<IN, OUT> implements FlatMapFunction<IN, OUT> { - - private final List<byte[]> elements; - private final Coder<OUT> coder; - - public FlinkCreateFunction(List<byte[]> elements, Coder<OUT> coder) { - this.elements = elements; - this.coder = coder; - } - - @Override - @SuppressWarnings("unchecked") - public void flatMap(IN value, Collector<OUT> out) throws Exception { - - for (byte[] element : elements) { - ByteArrayInputStream bai = new ByteArrayInputStream(element); - OUT outValue = coder.decode(bai, Coder.Context.OUTER); - if (outValue == null) { - // TODO Flink doesn't allow null values in records - out.collect((OUT) VoidCoderTypeSerializer.VoidValue.INSTANCE); - } else { - out.collect(outValue); - } - } - - out.close(); - } -}
