Repository: beam Updated Branches: refs/heads/master fac4f3e3c -> 686b774ce
[BEAM-1993] Remove special unbounded Flink source/sink Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/d8213fa6 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/d8213fa6 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/d8213fa6 Branch: refs/heads/master Commit: d8213fa6b274cd6acbf4da6deffd21ca23fd7f42 Parents: fac4f3e Author: Ismaël MejÃa <ieme...@apache.org> Authored: Tue Apr 18 16:03:11 2017 +0200 Committer: Ismaël MejÃa <ieme...@apache.org> Committed: Tue Apr 18 16:15:09 2017 +0200 ---------------------------------------------------------------------- .../examples/streaming/KafkaIOExamples.java | 338 ------------------- .../KafkaWindowedWordCountExample.java | 164 --------- .../FlinkStreamingTransformTranslators.java | 87 +---- .../flink/translation/types/FlinkCoder.java | 63 ---- .../streaming/io/UnboundedFlinkSink.java | 200 ----------- .../streaming/io/UnboundedFlinkSource.java | 120 ------- 6 files changed, 12 insertions(+), 960 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/d8213fa6/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaIOExamples.java ---------------------------------------------------------------------- diff --git a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaIOExamples.java b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaIOExamples.java deleted file mode 100644 index 616e276..0000000 --- a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaIOExamples.java +++ /dev/null @@ -1,338 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.flink.examples.streaming; - -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.io.Serializable; -import java.util.Properties; -import org.apache.beam.runners.flink.FlinkPipelineOptions; -import org.apache.beam.runners.flink.FlinkRunner; -import org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedFlinkSink; -import org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedFlinkSource; -import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.coders.AvroCoder; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.StringUtf8Coder; -import org.apache.beam.sdk.io.Read; -import org.apache.beam.sdk.io.Write; -import org.apache.beam.sdk.options.Default; -import org.apache.beam.sdk.options.Description; -import org.apache.beam.sdk.options.PipelineOptionsFactory; -import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.values.PCollection; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.java.typeutils.TypeExtractor; -import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08; -import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer08; -import org.apache.flink.streaming.util.serialization.DeserializationSchema; -import org.apache.flink.streaming.util.serialization.SerializationSchema; -import org.apache.flink.streaming.util.serialization.SimpleStringSchema; - -/** - * Recipes/Examples that demonstrate how to read/write data from/to Kafka. - */ -public class KafkaIOExamples { - - - private static final String KAFKA_TOPIC = "input"; // Default kafka topic to read from - private static final String KAFKA_AVRO_TOPIC = "output"; // Default kafka topic to read from - private static final String KAFKA_BROKER = "localhost:9092"; // Default kafka broker to contact - private static final String GROUP_ID = "myGroup"; // Default groupId - private static final String ZOOKEEPER = "localhost:2181"; // Default zookeeper to connect (Kafka) - - /** - * Read/Write String data to Kafka. - */ - public static class KafkaString { - - /** - * Read String data from Kafka. - */ - public static class ReadStringFromKafka { - - public static void main(String[] args) { - - Pipeline p = initializePipeline(args); - KafkaOptions options = getOptions(p); - - FlinkKafkaConsumer08<String> kafkaConsumer = - new FlinkKafkaConsumer08<>(options.getKafkaTopic(), - new SimpleStringSchema(), getKafkaProps(options)); - - p - .apply(Read.from(UnboundedFlinkSource.of(kafkaConsumer))).setCoder(StringUtf8Coder.of()) - .apply(ParDo.of(new PrintFn<>())); - - p.run(); - - } - - } - - /** - * Write String data to Kafka. - */ - public static class WriteStringToKafka { - - public static void main(String[] args) { - - Pipeline p = initializePipeline(args); - KafkaOptions options = getOptions(p); - - PCollection<String> words = - p.apply(Create.of("These", "are", "some", "words")); - - FlinkKafkaProducer08<String> kafkaSink = - new FlinkKafkaProducer08<>(options.getKafkaTopic(), - new SimpleStringSchema(), getKafkaProps(options)); - - words.apply(Write.to(UnboundedFlinkSink.of(kafkaSink))); - - p.run(); - } - - } - } - - /** - * Read/Write Avro data to Kafka. - */ - public static class KafkaAvro { - - /** - * Read Avro data from Kafka. - */ - public static class ReadAvroFromKafka { - - public static void main(String[] args) { - - Pipeline p = initializePipeline(args); - KafkaOptions options = getOptions(p); - - FlinkKafkaConsumer08<MyType> kafkaConsumer = - new FlinkKafkaConsumer08<>(options.getKafkaAvroTopic(), - new AvroSerializationDeserializationSchema<>(MyType.class), getKafkaProps(options)); - - p - .apply(Read.from(UnboundedFlinkSource.of(kafkaConsumer))) - .setCoder(AvroCoder.of(MyType.class)) - .apply(ParDo.of(new PrintFn<>())); - - p.run(); - - } - - } - - /** - * Write Avro data to Kafka. - */ - public static class WriteAvroToKafka { - - public static void main(String[] args) { - - Pipeline p = initializePipeline(args); - KafkaOptions options = getOptions(p); - - PCollection<MyType> words = - p.apply(Create.of( - new MyType("word", 1L), - new MyType("another", 2L), - new MyType("yet another", 3L))); - - FlinkKafkaProducer08<MyType> kafkaSink = - new FlinkKafkaProducer08<>(options.getKafkaAvroTopic(), - new AvroSerializationDeserializationSchema<>(MyType.class), getKafkaProps(options)); - - words.apply(Write.to(UnboundedFlinkSink.of(kafkaSink))); - - p.run(); - - } - } - - /** - * Serialiation/Deserialiation schema for Avro types. - * @param <T> the type being encoded - */ - static class AvroSerializationDeserializationSchema<T> - implements SerializationSchema<T>, DeserializationSchema<T> { - - private final Class<T> avroType; - - private final AvroCoder<T> coder; - private transient ByteArrayOutputStream out; - - AvroSerializationDeserializationSchema(Class<T> clazz) { - this.avroType = clazz; - this.coder = AvroCoder.of(clazz); - this.out = new ByteArrayOutputStream(); - } - - @Override - public byte[] serialize(T element) { - if (out == null) { - out = new ByteArrayOutputStream(); - } - try { - out.reset(); - coder.encode(element, out, Coder.Context.NESTED); - } catch (IOException e) { - throw new RuntimeException("Avro encoding failed.", e); - } - return out.toByteArray(); - } - - @Override - public T deserialize(byte[] message) throws IOException { - return coder.decode(new ByteArrayInputStream(message), Coder.Context.NESTED); - } - - @Override - public boolean isEndOfStream(T nextElement) { - return false; - } - - @Override - public TypeInformation<T> getProducedType() { - return TypeExtractor.getForClass(avroType); - } - } - - /** - * Custom type for Avro serialization. - */ - static class MyType implements Serializable { - - public MyType() {} - - MyType(String word, long count) { - this.word = word; - this.count = count; - } - - String word; - long count; - - @Override - public String toString() { - return "MyType{" - + "word='" + word + '\'' - + ", count=" + count - + '}'; - } - } - } - - // -------------- Utilities -------------- - - /** - * Custom options for the Pipeline. - */ - public interface KafkaOptions extends FlinkPipelineOptions { - @Description("The Kafka topic to read from") - @Default.String(KAFKA_TOPIC) - String getKafkaTopic(); - - void setKafkaTopic(String value); - - void setKafkaAvroTopic(String value); - - @Description("The Kafka topic to read from") - @Default.String(KAFKA_AVRO_TOPIC) - String getKafkaAvroTopic(); - - @Description("The Kafka Broker to read from") - @Default.String(KAFKA_BROKER) - String getBroker(); - - void setBroker(String value); - - @Description("The Zookeeper server to connect to") - @Default.String(ZOOKEEPER) - String getZookeeper(); - - void setZookeeper(String value); - - @Description("The groupId") - @Default.String(GROUP_ID) - String getGroup(); - - void setGroup(String value); - } - - /** - * Initializes some options for the Flink runner. - * @param args The command line args - * @return the pipeline - */ - private static Pipeline initializePipeline(String[] args) { - KafkaOptions options = - PipelineOptionsFactory.fromArgs(args).as(KafkaOptions.class); - - options.setStreaming(true); - options.setRunner(FlinkRunner.class); - - options.setCheckpointingInterval(1000L); - options.setNumberOfExecutionRetries(5); - options.setExecutionRetryDelay(3000L); - - return Pipeline.create(options); - } - - /** - * Gets KafkaOptions from the Pipeline. - * @param p the pipeline - * @return KafkaOptions - */ - private static KafkaOptions getOptions(Pipeline p) { - return p.getOptions().as(KafkaOptions.class); - } - - /** - * Helper method to set the Kafka props from the pipeline options. - * @param options KafkaOptions - * @return Kafka props - */ - private static Properties getKafkaProps(KafkaOptions options) { - - Properties props = new Properties(); - props.setProperty("zookeeper.connect", options.getZookeeper()); - props.setProperty("bootstrap.servers", options.getBroker()); - props.setProperty("group.id", options.getGroup()); - - return props; - } - - /** - * Print contents to stdout. - * @param <T> type of the input - */ - private static class PrintFn<T> extends DoFn<T, T> { - - @ProcessElement - public void processElement(ProcessContext c) throws Exception { - System.out.println(c.element().toString()); - } - } - -} http://git-wip-us.apache.org/repos/asf/beam/blob/d8213fa6/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaWindowedWordCountExample.java ---------------------------------------------------------------------- diff --git a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaWindowedWordCountExample.java b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaWindowedWordCountExample.java deleted file mode 100644 index ee0e874..0000000 --- a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaWindowedWordCountExample.java +++ /dev/null @@ -1,164 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.flink.examples.streaming; - -import java.util.Properties; -import org.apache.beam.runners.flink.FlinkRunner; -import org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedFlinkSource; -import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.io.Read; -import org.apache.beam.sdk.io.TextIO; -import org.apache.beam.sdk.options.Default; -import org.apache.beam.sdk.options.Description; -import org.apache.beam.sdk.options.PipelineOptionsFactory; -import org.apache.beam.sdk.transforms.Aggregator; -import org.apache.beam.sdk.transforms.Count; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.transforms.Sum; -import org.apache.beam.sdk.transforms.windowing.AfterWatermark; -import org.apache.beam.sdk.transforms.windowing.FixedWindows; -import org.apache.beam.sdk.transforms.windowing.Window; -import org.apache.beam.sdk.values.KV; -import org.apache.beam.sdk.values.PCollection; -import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08; -import org.apache.flink.streaming.util.serialization.SimpleStringSchema; -import org.joda.time.Duration; - -/** - * Wordcount example using Kafka topic. - */ -public class KafkaWindowedWordCountExample { - - static final String KAFKA_TOPIC = "test"; // Default kafka topic to read from - static final String KAFKA_BROKER = "localhost:9092"; // Default kafka broker to contact - static final String GROUP_ID = "myGroup"; // Default groupId - static final String ZOOKEEPER = "localhost:2181"; // Default zookeeper to connect to for Kafka - - /** - * Function to extract words. - */ - public static class ExtractWordsFn extends DoFn<String, String> { - private final Aggregator<Long, Long> emptyLines = - createAggregator("emptyLines", Sum.ofLongs()); - - @ProcessElement - public void processElement(ProcessContext c) { - if (c.element().trim().isEmpty()) { - emptyLines.addValue(1L); - } - - // Split the line into words. - String[] words = c.element().split("[^a-zA-Z']+"); - - // Output each word encountered into the output PCollection. - for (String word : words) { - if (!word.isEmpty()) { - c.output(word); - } - } - } - } - - /** - * Function to format KV as String. - */ - public static class FormatAsStringFn extends DoFn<KV<String, Long>, String> { - @ProcessElement - public void processElement(ProcessContext c) { - String row = c.element().getKey() + " - " + c.element().getValue() + " @ " - + c.timestamp().toString(); - System.out.println(row); - c.output(row); - } - } - - /** - * Pipeline options. - */ - public interface KafkaStreamingWordCountOptions - extends WindowedWordCount.StreamingWordCountOptions { - @Description("The Kafka topic to read from") - @Default.String(KAFKA_TOPIC) - String getKafkaTopic(); - - void setKafkaTopic(String value); - - @Description("The Kafka Broker to read from") - @Default.String(KAFKA_BROKER) - String getBroker(); - - void setBroker(String value); - - @Description("The Zookeeper server to connect to") - @Default.String(ZOOKEEPER) - String getZookeeper(); - - void setZookeeper(String value); - - @Description("The groupId") - @Default.String(GROUP_ID) - String getGroup(); - - void setGroup(String value); - - } - - public static void main(String[] args) { - PipelineOptionsFactory.register(KafkaStreamingWordCountOptions.class); - KafkaStreamingWordCountOptions options = PipelineOptionsFactory.fromArgs(args) - .as(KafkaStreamingWordCountOptions.class); - options.setJobName("KafkaExample - WindowSize: " + options.getWindowSize() + " seconds"); - options.setStreaming(true); - options.setCheckpointingInterval(1000L); - options.setNumberOfExecutionRetries(5); - options.setExecutionRetryDelay(3000L); - options.setRunner(FlinkRunner.class); - - System.out.println(options.getKafkaTopic() + " " + options.getZookeeper() + " " - + options.getBroker() + " " + options.getGroup()); - Pipeline pipeline = Pipeline.create(options); - - Properties p = new Properties(); - p.setProperty("zookeeper.connect", options.getZookeeper()); - p.setProperty("bootstrap.servers", options.getBroker()); - p.setProperty("group.id", options.getGroup()); - - // this is the Flink consumer that reads the input to - // the program from a kafka topic. - FlinkKafkaConsumer08<String> kafkaConsumer = new FlinkKafkaConsumer08<>( - options.getKafkaTopic(), - new SimpleStringSchema(), p); - - PCollection<String> words = pipeline - .apply("StreamingWordCount", Read.from(UnboundedFlinkSource.of(kafkaConsumer))) - .apply(ParDo.of(new ExtractWordsFn())) - .apply(Window.<String>into(FixedWindows.of( - Duration.standardSeconds(options.getWindowSize()))) - .triggering(AfterWatermark.pastEndOfWindow()).withAllowedLateness(Duration.ZERO) - .discardingFiredPanes()); - - PCollection<KV<String, Long>> wordCounts = - words.apply(Count.<String>perElement()); - - wordCounts.apply(ParDo.of(new FormatAsStringFn())) - .apply(TextIO.Write.to("./outputKafka.txt")); - - pipeline.run(); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/d8213fa6/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java index fbd7620..123d5e7 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java @@ -36,7 +36,6 @@ import org.apache.beam.runners.core.SplittableParDo; import org.apache.beam.runners.core.SystemReduceFn; import org.apache.beam.runners.flink.translation.functions.FlinkAssignWindows; import org.apache.beam.runners.flink.translation.types.CoderTypeInformation; -import org.apache.beam.runners.flink.translation.types.FlinkCoder; import org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator; import org.apache.beam.runners.flink.translation.wrappers.streaming.KvToByteBufferKeySelector; import org.apache.beam.runners.flink.translation.wrappers.streaming.SingletonKeyedWorkItem; @@ -45,17 +44,13 @@ import org.apache.beam.runners.flink.translation.wrappers.streaming.SplittableDo import org.apache.beam.runners.flink.translation.wrappers.streaming.WindowDoFnOperator; import org.apache.beam.runners.flink.translation.wrappers.streaming.WorkItemKeySelector; import org.apache.beam.runners.flink.translation.wrappers.streaming.io.BoundedSourceWrapper; -import org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedFlinkSink; -import org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedFlinkSource; import org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.coders.VoidCoder; import org.apache.beam.sdk.io.Read; -import org.apache.beam.sdk.io.Sink; import org.apache.beam.sdk.io.TextIO; -import org.apache.beam.sdk.io.Write; import org.apache.beam.sdk.transforms.Combine; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.Flatten; @@ -69,7 +64,6 @@ import org.apache.beam.sdk.transforms.reflect.DoFnSignatures; import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; -import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.transforms.windowing.WindowFn; import org.apache.beam.sdk.util.AppliedCombineFn; @@ -94,12 +88,10 @@ import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.datastream.SplitStream; -import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; import org.apache.flink.streaming.api.transformations.TwoInputTransformation; import org.apache.flink.util.Collector; -import org.joda.time.Instant; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -124,7 +116,6 @@ class FlinkStreamingTransformTranslators { static { TRANSLATORS.put(Read.Bounded.class, new BoundedReadSourceTranslator()); TRANSLATORS.put(Read.Unbounded.class, new UnboundedReadSourceTranslator()); - TRANSLATORS.put(Write.class, new WriteSinkStreamingTranslator()); TRANSLATORS.put(TextIO.Write.Bound.class, new TextIOWriteBoundStreamingTranslator()); TRANSLATORS.put(ParDo.MultiOutput.class, new ParDoStreamingTranslator()); @@ -203,31 +194,6 @@ class FlinkStreamingTransformTranslators { } } - private static class WriteSinkStreamingTranslator<T> - extends FlinkStreamingPipelineTranslator.StreamTransformTranslator<Write<T>> { - - @Override - public void translateNode(Write<T> transform, FlinkStreamingTranslationContext context) { - String name = transform.getName(); - PValue input = context.getInput(transform); - - Sink<T> sink = transform.getSink(); - if (!(sink instanceof UnboundedFlinkSink)) { - throw new UnsupportedOperationException( - "At the time, only unbounded Flink sinks are supported."); - } - - DataStream<WindowedValue<T>> inputDataSet = context.getInputDataStream(input); - - inputDataSet.flatMap(new FlatMapFunction<WindowedValue<T>, Object>() { - @Override - public void flatMap(WindowedValue<T> value, Collector<Object> out) throws Exception { - out.collect(value.getValue()); - } - }).addSink(((UnboundedFlinkSink<Object>) sink).getFlinkSource()).name(name); - } - } - private static class UnboundedReadSourceTranslator<T> extends FlinkStreamingPipelineTranslator.StreamTransformTranslator<Read.Unbounded<T>> { @@ -241,47 +207,18 @@ class FlinkStreamingTransformTranslators { context.getTypeInfo(context.getOutput(transform)); DataStream<WindowedValue<T>> source; - if (transform.getSource().getClass().equals(UnboundedFlinkSource.class)) { - @SuppressWarnings("unchecked") - UnboundedFlinkSource<T> flinkSourceFunction = - (UnboundedFlinkSource<T>) transform.getSource(); - - final AssignerWithPeriodicWatermarks<T> flinkAssigner = - flinkSourceFunction.getFlinkTimestampAssigner(); - - DataStream<T> flinkSource = context.getExecutionEnvironment() - .addSource(flinkSourceFunction.getFlinkSource()); - - flinkSourceFunction.setCoder( - new FlinkCoder<T>(flinkSource.getType(), - context.getExecutionEnvironment().getConfig())); - - source = flinkSource - .assignTimestampsAndWatermarks(flinkAssigner) - .flatMap(new FlatMapFunction<T, WindowedValue<T>>() { - @Override - public void flatMap(T s, Collector<WindowedValue<T>> collector) throws Exception { - collector.collect( - WindowedValue.of( - s, - new Instant(flinkAssigner.extractTimestamp(s, -1)), - GlobalWindow.INSTANCE, - PaneInfo.NO_FIRING)); - }}).returns(outputTypeInfo); - } else { - try { - UnboundedSourceWrapper<T, ?> sourceWrapper = - new UnboundedSourceWrapper<>( - context.getPipelineOptions(), - transform.getSource(), - context.getExecutionEnvironment().getParallelism()); - source = context - .getExecutionEnvironment() - .addSource(sourceWrapper).name(transform.getName()).returns(outputTypeInfo); - } catch (Exception e) { - throw new RuntimeException( - "Error while translating UnboundedSource: " + transform.getSource(), e); - } + try { + UnboundedSourceWrapper<T, ?> sourceWrapper = + new UnboundedSourceWrapper<>( + context.getPipelineOptions(), + transform.getSource(), + context.getExecutionEnvironment().getParallelism()); + source = context + .getExecutionEnvironment() + .addSource(sourceWrapper).name(transform.getName()).returns(outputTypeInfo); + } catch (Exception e) { + throw new RuntimeException( + "Error while translating UnboundedSource: " + transform.getSource(), e); } context.setOutputDataStream(output, source); http://git-wip-us.apache.org/repos/asf/beam/blob/d8213fa6/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/FlinkCoder.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/FlinkCoder.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/FlinkCoder.java deleted file mode 100644 index 8b90c73..0000000 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/FlinkCoder.java +++ /dev/null @@ -1,63 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.flink.translation.types; - -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.util.Collections; -import java.util.List; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.StandardCoder; -import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.core.memory.DataInputViewStreamWrapper; -import org.apache.flink.core.memory.DataOutputViewStreamWrapper; - -/** - * A Coder that uses Flink's serialization system. - * @param <T> The type of the value to be encoded - */ -public class FlinkCoder<T> extends StandardCoder<T> { - - private final TypeSerializer<T> typeSerializer; - - public FlinkCoder(TypeInformation<T> typeInformation, ExecutionConfig executionConfig) { - this.typeSerializer = typeInformation.createSerializer(executionConfig); - } - - @Override - public void encode(T value, OutputStream outStream, Context context) throws IOException { - typeSerializer.serialize(value, new DataOutputViewStreamWrapper(outStream)); - } - - @Override - public T decode(InputStream inStream, Context context) throws IOException { - return typeSerializer.deserialize(new DataInputViewStreamWrapper(inStream)); - } - - @Override - public List<? extends Coder<?>> getCoderArguments() { - return Collections.emptyList(); - } - - @Override - public void verifyDeterministic() throws NonDeterministicException { - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/d8213fa6/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedFlinkSink.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedFlinkSink.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedFlinkSink.java deleted file mode 100644 index af36b80..0000000 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedFlinkSink.java +++ /dev/null @@ -1,200 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.flink.translation.wrappers.streaming.io; - - -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.util.Collection; -import java.util.List; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.CoderException; -import org.apache.beam.sdk.io.Sink; -import org.apache.beam.sdk.io.UnboundedSource; -import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.transforms.windowing.PaneInfo; -import org.apache.beam.sdk.util.CloudObject; -import org.apache.beam.sdk.util.common.ElementByteSizeObserver; -import org.apache.beam.sdk.values.TypeDescriptor; -import org.apache.flink.streaming.api.functions.sink.SinkFunction; - -/** - * A wrapper translating Flink sinks implementing the {@link SinkFunction} interface, into - * unbounded Beam sinks (see {@link UnboundedSource}). - * */ -public class UnboundedFlinkSink<T> extends Sink<T> { - - /* The Flink sink function */ - private final SinkFunction<T> flinkSink; - - private UnboundedFlinkSink(SinkFunction<T> flinkSink) { - this.flinkSink = flinkSink; - } - - public SinkFunction<T> getFlinkSource() { - return this.flinkSink; - } - - @Override - public void validate(PipelineOptions options) { - } - - @Override - public WriteOperation<T, ?> createWriteOperation(PipelineOptions options) { - return new WriteOperation<T, Object>() { - @Override - public void initialize(PipelineOptions options) throws Exception { - - } - - @Override - public void setWindowedWrites(boolean windowedWrites) { - } - - @Override - public void finalize(Iterable<Object> writerResults, PipelineOptions options) - throws Exception { - - } - - @Override - public Coder<Object> getWriterResultCoder() { - return new Coder<Object>() { - @Override - public void encode(Object value, OutputStream outStream, Context context) - throws CoderException, IOException { - - } - - @Override - public Object decode(InputStream inStream, Context context) - throws CoderException, IOException { - return null; - } - - @Override - public List<? extends Coder<?>> getCoderArguments() { - return null; - } - - @Override - public CloudObject asCloudObject() { - return null; - } - - @Override - public void verifyDeterministic() throws NonDeterministicException { - - } - - @Override - public boolean consistentWithEquals() { - return false; - } - - @Override - public Object structuralValue(Object value) throws Exception { - return null; - } - - @Override - public boolean isRegisterByteSizeObserverCheap(Object value, Context context) { - return false; - } - - @Override - public void registerByteSizeObserver(Object value, - ElementByteSizeObserver observer, - Context context) throws Exception { - - } - - @Override - public String getEncodingId() { - return null; - } - - @Override - public Collection<String> getAllowedEncodings() { - return null; - } - - @Override - public TypeDescriptor<Object> getEncodedTypeDescriptor() { - return TypeDescriptor.of(Object.class); - } - }; - } - - @Override - public Writer<T, Object> createWriter(PipelineOptions options) throws Exception { - return new Writer<T, Object>() { - @Override - public void openWindowed(String uId, - BoundedWindow window, - PaneInfo paneInfo, - int shard, - int numShards) throws Exception { - } - - @Override - public void openUnwindowed(String uId, int shard, int numShards) throws Exception { - } - - @Override - public void cleanup() throws Exception { - - } - - @Override - public void write(T value) throws Exception { - - } - - @Override - public Object close() throws Exception { - return null; - } - - @Override - public WriteOperation<T, Object> getWriteOperation() { - return null; - } - - }; - } - - @Override - public Sink<T> getSink() { - return UnboundedFlinkSink.this; - } - }; - } - - /** - * Creates a Flink sink to write to using the Write API. - * @param flinkSink The Flink sink, e.g. FlinkKafkaProducer09 - * @param <T> The input type of the sink - * @return A Beam sink wrapping a Flink sink - */ - public static <T> Sink<T> of(SinkFunction<T> flinkSink) { - return new UnboundedFlinkSink<>(flinkSink); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/d8213fa6/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedFlinkSource.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedFlinkSource.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedFlinkSource.java deleted file mode 100644 index ac20c34..0000000 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedFlinkSource.java +++ /dev/null @@ -1,120 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.flink.translation.wrappers.streaming.io; - -import static com.google.common.base.Preconditions.checkNotNull; - -import java.util.List; -import javax.annotation.Nullable; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.io.UnboundedSource; -import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks; -import org.apache.flink.streaming.api.functions.IngestionTimeExtractor; -import org.apache.flink.streaming.api.functions.source.SourceFunction; - -/** - * A wrapper translating Flink Sources implementing the {@link SourceFunction} interface, into - * unbounded Beam sources (see {@link UnboundedSource}). - * */ -public class UnboundedFlinkSource<T> extends UnboundedSource<T, UnboundedSource.CheckpointMark> { - - private final SourceFunction<T> flinkSource; - - /** Coder set during translation. */ - private Coder<T> coder; - - /** Timestamp / watermark assigner for source; defaults to ingestion time. */ - private AssignerWithPeriodicWatermarks<T> flinkTimestampAssigner = - new IngestionTimeExtractor<T>(); - - public UnboundedFlinkSource(SourceFunction<T> source) { - flinkSource = checkNotNull(source); - } - - public UnboundedFlinkSource(SourceFunction<T> source, - AssignerWithPeriodicWatermarks<T> timestampAssigner) { - flinkSource = checkNotNull(source); - flinkTimestampAssigner = checkNotNull(timestampAssigner); - } - - public SourceFunction<T> getFlinkSource() { - return this.flinkSource; - } - - public AssignerWithPeriodicWatermarks<T> getFlinkTimestampAssigner() { - return flinkTimestampAssigner; - } - - @Override - public List<? extends UnboundedSource<T, UnboundedSource.CheckpointMark>> generateInitialSplits( - int desiredNumSplits, - PipelineOptions options) throws Exception { - throw new RuntimeException("Flink Sources are supported only when " - + "running with the FlinkRunner."); - } - - @Override - public UnboundedReader<T> createReader(PipelineOptions options, - @Nullable CheckpointMark checkpointMark) { - throw new RuntimeException("Flink Sources are supported only when " - + "running with the FlinkRunner."); - } - - @Nullable - @Override - public Coder<UnboundedSource.CheckpointMark> getCheckpointMarkCoder() { - throw new RuntimeException("Flink Sources are supported only when " - + "running with the FlinkRunner."); - } - - - @Override - public void validate() { - } - - @Override - public Coder<T> getDefaultOutputCoder() { - // The coder derived from the Flink source - return coder; - } - - public void setCoder(Coder<T> coder) { - this.coder = coder; - } - - public void setFlinkTimestampAssigner(AssignerWithPeriodicWatermarks<T> flinkTimestampAssigner) { - this.flinkTimestampAssigner = flinkTimestampAssigner; - } - - /** - * Creates a new unbounded source from a Flink source. - * @param flinkSource The Flink source function - * @param <T> The type that the source function produces. - * @return The wrapped source function. - */ - public static <T> UnboundedSource<T, UnboundedSource.CheckpointMark> of( - SourceFunction<T> flinkSource) { - return new UnboundedFlinkSource<>(flinkSource); - } - - public static <T> UnboundedSource<T, UnboundedSource.CheckpointMark> of( - SourceFunction<T> flinkSource, AssignerWithPeriodicWatermarks<T> flinkTimestampAssigner) { - return new UnboundedFlinkSource<>(flinkSource, flinkTimestampAssigner); - } -}