[FLINK-6539] Combine ReadFromKafka/WriteIntoKafka into one Kafka010Example This also updates the Kafka version we use in the examples module to 0.10.x.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c86f71be Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c86f71be Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c86f71be Branch: refs/heads/master Commit: c86f71be9d83290c60a8dd63afe92ef9dcebac9c Parents: 4f1c764 Author: Aljoscha Krettek <aljoscha.kret...@gmail.com> Authored: Tue May 9 14:46:39 2017 +0200 Committer: Aljoscha Krettek <aljoscha.kret...@gmail.com> Committed: Mon Jul 24 11:18:06 2017 +0200 ---------------------------------------------------------------------- flink-examples/flink-examples-streaming/pom.xml | 8 +- .../examples/kafka/Kafka010Example.java | 90 ++++++++++++++++++++ .../streaming/examples/kafka/ReadFromKafka.java | 64 -------------- .../examples/kafka/WriteIntoKafka.java | 73 ---------------- .../scala/examples/kafka/Kafka010Example.scala | 88 +++++++++++++++++++ .../scala/examples/kafka/ReadFromKafka.scala | 73 ---------------- .../scala/examples/kafka/WriteIntoKafka.scala | 84 ------------------ 7 files changed, 182 insertions(+), 298 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/c86f71be/flink-examples/flink-examples-streaming/pom.xml ---------------------------------------------------------------------- diff --git a/flink-examples/flink-examples-streaming/pom.xml b/flink-examples/flink-examples-streaming/pom.xml index b023cd5..eba81d3 100644 --- a/flink-examples/flink-examples-streaming/pom.xml +++ b/flink-examples/flink-examples-streaming/pom.xml @@ -58,7 +58,7 @@ under the License. <dependency> <groupId>org.apache.flink</groupId> - <artifactId>flink-connector-kafka-0.8_${scala.binary.version}</artifactId> + <artifactId>flink-connector-kafka-0.10_${scala.binary.version}</artifactId> <version>${project.version}</version> </dependency> @@ -488,7 +488,7 @@ under the License. <artifactId>maven-shade-plugin</artifactId> <executions> <execution> - <id>fat-jar-kafka-example</id> + <id>fat-jar-kafka-010-example</id> <phase>package</phase> <goals> <goal>shade</goal> @@ -499,10 +499,10 @@ under the License. <createDependencyReducedPom>false</createDependencyReducedPom> <transformers> <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> - <mainClass>org.apache.flink.streaming.examples.kafka.ReadFromKafka</mainClass> + <mainClass>org.apache.flink.streaming.examples.kafka.Kafka010Example</mainClass> </transformer> </transformers> - <finalName>Kafka</finalName> + <finalName>Kafka010Example</finalName> <!-- <outputFile>Kafka.jar</outputFile> --> <filters> <filter> http://git-wip-us.apache.org/repos/asf/flink/blob/c86f71be/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/kafka/Kafka010Example.java ---------------------------------------------------------------------- diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/kafka/Kafka010Example.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/kafka/Kafka010Example.java new file mode 100644 index 0000000..b5abbc5 --- /dev/null +++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/kafka/Kafka010Example.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.examples.kafka; + +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.java.utils.ParameterTool; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010; +import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010; +import org.apache.flink.streaming.util.serialization.SimpleStringSchema; + + +/** + * An example that shows how to read from and write to Kafka. This will read String messages + * from the input topic, prefix them by a configured prefix and output to the output topic. + * + * <p>Example usage: + * --input-topic test-input --output-topic test-output --bootstrap.servers localhost:9092 --zookeeper.connect localhost:2181 --group.id myconsumer + */ +public class Kafka010Example { + + public static void main(String[] args) throws Exception { + // parse input arguments + final ParameterTool parameterTool = ParameterTool.fromArgs(args); + + if (parameterTool.getNumberOfParameters() < 5) { + System.out.println("Missing parameters!\n" + + "Usage: Kafka --input-topic <topic> --output-topic <topic> " + + "--bootstrap.servers <kafka brokers> " + + "--zookeeper.connect <zk quorum> --group.id <some id> [--prefix <prefix>]"); + return; + } + + String prefix = parameterTool.get("prefix", "PREFIX:"); + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.getConfig().disableSysoutLogging(); + env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 10000)); + env.enableCheckpointing(5000); // create a checkpoint every 5 seconds + env.getConfig().setGlobalJobParameters(parameterTool); // make parameters available in the web interface + + // make parameters available in the web interface + env.getConfig().setGlobalJobParameters(parameterTool); + + DataStream<String> input = env + .addSource(new FlinkKafkaConsumer010<>( + parameterTool.getRequired("input-topic"), + new SimpleStringSchema(), + parameterTool.getProperties())) + .map(new PrefixingMapper(prefix)); + + input.addSink( + new FlinkKafkaProducer010<>( + parameterTool.getRequired("output-topic"), + new SimpleStringSchema(), + parameterTool.getProperties())); + + env.execute("Kafka 0.10 Example"); + } + + private static class PrefixingMapper implements MapFunction<String, String> { + private final String prefix; + + public PrefixingMapper(String prefix) { + this.prefix = prefix; + } + + @Override + public String map(String value) throws Exception { + return prefix + value; + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/c86f71be/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/kafka/ReadFromKafka.java ---------------------------------------------------------------------- diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/kafka/ReadFromKafka.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/kafka/ReadFromKafka.java deleted file mode 100644 index f9cf42b..0000000 --- a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/kafka/ReadFromKafka.java +++ /dev/null @@ -1,64 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.examples.kafka; - -import org.apache.flink.api.common.restartstrategy.RestartStrategies; -import org.apache.flink.api.java.utils.ParameterTool; -import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08; -import org.apache.flink.streaming.util.serialization.SimpleStringSchema; - - -/** - * Read Strings from Kafka and print them to standard out. - * Note: On a cluster, DataStream.print() will print to the TaskManager's .out file! - * - * <p>Please pass the following arguments to run the example: - * --topic test --bootstrap.servers localhost:9092 --zookeeper.connect localhost:2181 --group.id myconsumer - */ -public class ReadFromKafka { - - public static void main(String[] args) throws Exception { - // parse input arguments - final ParameterTool parameterTool = ParameterTool.fromArgs(args); - - if (parameterTool.getNumberOfParameters() < 4) { - System.out.println("Missing parameters!\nUsage: Kafka --topic <topic> " + - "--bootstrap.servers <kafka brokers> --zookeeper.connect <zk quorum> --group.id <some id>"); - return; - } - - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.getConfig().disableSysoutLogging(); - env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 10000)); - env.enableCheckpointing(5000); // create a checkpoint every 5 seconds - env.getConfig().setGlobalJobParameters(parameterTool); // make parameters available in the web interface - - DataStream<String> messageStream = env - .addSource(new FlinkKafkaConsumer08<>( - parameterTool.getRequired("topic"), - new SimpleStringSchema(), - parameterTool.getProperties())); - - // write kafka stream to standard out. - messageStream.print(); - - env.execute("Read from Kafka example"); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/c86f71be/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/kafka/WriteIntoKafka.java ---------------------------------------------------------------------- diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/kafka/WriteIntoKafka.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/kafka/WriteIntoKafka.java deleted file mode 100644 index f9b4656..0000000 --- a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/kafka/WriteIntoKafka.java +++ /dev/null @@ -1,73 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.examples.kafka; - -import org.apache.flink.api.common.restartstrategy.RestartStrategies; -import org.apache.flink.api.java.utils.ParameterTool; -import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.functions.source.SourceFunction; -import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer08; -import org.apache.flink.streaming.util.serialization.SimpleStringSchema; - -/** - * Generate a String every 500 ms and write it into a Kafka topic. - * - * <p>Please pass the following arguments to run the example: - * --topic test --bootstrap.servers localhost:9092 - */ -public class WriteIntoKafka { - - public static void main(String[] args) throws Exception { - ParameterTool parameterTool = ParameterTool.fromArgs(args); - if (parameterTool.getNumberOfParameters() < 2) { - System.out.println("Missing parameters!"); - System.out.println("Usage: Kafka --topic <topic> --bootstrap.servers <kafka brokers>"); - return; - } - - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.getConfig().disableSysoutLogging(); - env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 10000)); - - // very simple data generator - DataStream<String> messageStream = env.addSource(new SourceFunction<String>() { - private static final long serialVersionUID = 6369260445318862378L; - public boolean running = true; - - @Override - public void run(SourceContext<String> ctx) throws Exception { - long i = 0; - while (this.running) { - ctx.collect("Element - " + i++); - Thread.sleep(500); - } - } - - @Override - public void cancel() { - running = false; - } - }); - - // write data into Kafka - messageStream.addSink(new FlinkKafkaProducer08<>(parameterTool.getRequired("topic"), new SimpleStringSchema(), parameterTool.getProperties())); - - env.execute("Write into Kafka example"); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/c86f71be/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/kafka/Kafka010Example.scala ---------------------------------------------------------------------- diff --git a/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/kafka/Kafka010Example.scala b/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/kafka/Kafka010Example.scala new file mode 100644 index 0000000..2a52811 --- /dev/null +++ b/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/kafka/Kafka010Example.scala @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.scala.examples.kafka + +import org.apache.flink.api.common.restartstrategy.RestartStrategies +import org.apache.flink.api.java.utils.ParameterTool +import org.apache.flink.streaming.api.scala._ +import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer010, FlinkKafkaProducer010} +import org.apache.flink.streaming.util.serialization.SimpleStringSchema + +/** + * An example that shows how to read from and write to Kafka. This will read String messages + * from the input topic, prefix them by a configured prefix and output to the output topic. + * + * Please pass the following arguments to run the example: + * {{{ + * --input-topic test-input + * --output-topic test-output + * --bootstrap.servers localhost:9092 + * --zookeeper.connect localhost:2181 + * --group.id myconsumer + * }}} + */ +object Kafka010Example { + + def main(args: Array[String]): Unit = { + + // parse input arguments + val params = ParameterTool.fromArgs(args) + + if (params.getNumberOfParameters < 4) { + println("Missing parameters!\n" + + "Usage: Kafka --input-topic <topic> --output-topic <topic> " + + "--bootstrap.servers <kafka brokers> " + + "--zookeeper.connect <zk quorum> --group.id <some id> [--prefix <prefix>]") + return + } + + val prefix = params.get("prefix", "PREFIX:") + + + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.getConfig.disableSysoutLogging + env.getConfig.setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 10000)) + // create a checkpoint every 5 seconds + env.enableCheckpointing(5000) + // make parameters available in the web interface + env.getConfig.setGlobalJobParameters(params) + + // create a Kafka streaming source consumer for Kafka 0.10.x + val kafkaConsumer = new FlinkKafkaConsumer010( + params.getRequired("input-topic"), + new SimpleStringSchema, + params.getProperties) + + val messageStream = env + .addSource(kafkaConsumer) + .map(in => prefix + in) + + // create a Kafka producer for Kafka 0.10.x + val kafkaProducer = new FlinkKafkaProducer010( + params.getRequired("output-topic"), + new SimpleStringSchema, + params.getProperties) + + // write data into Kafka + messageStream.addSink(kafkaProducer) + + env.execute("Kafka 0.10 Example") + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/c86f71be/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/kafka/ReadFromKafka.scala ---------------------------------------------------------------------- diff --git a/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/kafka/ReadFromKafka.scala b/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/kafka/ReadFromKafka.scala deleted file mode 100644 index 3127ab7..0000000 --- a/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/kafka/ReadFromKafka.scala +++ /dev/null @@ -1,73 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.scala.examples.kafka - -import org.apache.flink.api.common.restartstrategy.RestartStrategies -import org.apache.flink.api.java.utils.ParameterTool -import org.apache.flink.streaming.api.scala._ -import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08 -import org.apache.flink.streaming.util.serialization.SimpleStringSchema - -/** - * Read Strings from Kafka and print them to standard out. - * Note: On a cluster, DataStream.print() will print to the TaskManager's .out file! - * - * Please pass the following arguments to run the example: - * {{{ - * --topic test - * --bootstrap.servers localhost:9092 - * --zookeeper.connect localhost:2181 - * --group.id myconsumer - * }}} - */ -object ReadFromKafka { - - def main(args: Array[String]): Unit = { - - // parse input arguments - val params = ParameterTool.fromArgs(args) - - if (params.getNumberOfParameters < 4) { - println("Missing parameters!\nUsage: Kafka --topic <topic> " + - "--bootstrap.servers <kafka brokers> --zookeeper.connect <zk quorum> --group.id <some id>") - return - } - - val env = StreamExecutionEnvironment.getExecutionEnvironment - env.getConfig.disableSysoutLogging - env.getConfig.setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 10000)) - // create a checkpoint every 5 seconds - env.enableCheckpointing(5000) - // make parameters available in the web interface - env.getConfig.setGlobalJobParameters(params) - - // create a Kafka streaming source consumer for Kafka 0.8.x - val kafkaConsumer = new FlinkKafkaConsumer08( - params.getRequired("topic"), - new SimpleStringSchema, - params.getProperties) - val messageStream = env.addSource(kafkaConsumer) - - // write kafka stream to standard out. - messageStream.print() - - env.execute("Read from Kafka example") - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/c86f71be/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/kafka/WriteIntoKafka.scala ---------------------------------------------------------------------- diff --git a/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/kafka/WriteIntoKafka.scala b/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/kafka/WriteIntoKafka.scala deleted file mode 100644 index e34083a..0000000 --- a/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/kafka/WriteIntoKafka.scala +++ /dev/null @@ -1,84 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.scala.examples.kafka - -import org.apache.flink.api.common.restartstrategy.RestartStrategies -import org.apache.flink.api.java.utils.ParameterTool -import org.apache.flink.streaming.api.functions.source.SourceFunction -import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext -import org.apache.flink.streaming.api.scala._ -import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer08 -import org.apache.flink.streaming.util.serialization.SimpleStringSchema - -/** - * Generate a String every 500 ms and write it into a Kafka topic - * - * Please pass the following arguments to run the example: - * {{{ - * --topic test - * --bootstrap.servers - * localhost:9092 - * }}} - */ -object WriteIntoKafka { - - def main(args: Array[String]): Unit = { - - // parse input arguments - val params = ParameterTool.fromArgs(args) - - if (params.getNumberOfParameters < 2) { - println("Missing parameters!") - println("Usage: Kafka --topic <topic> --bootstrap.servers <kafka brokers>") - return - } - - val env = StreamExecutionEnvironment.getExecutionEnvironment - env.getConfig.disableSysoutLogging - env.getConfig.setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 10000)) - - // very simple data generator - val messageStream: DataStream[String] = env.addSource(new SourceFunction[String]() { - var running = true - - override def run(ctx: SourceContext[String]): Unit = { - var i = 0L - while (this.running) { - ctx.collect(s"Element - ${i}") - i += 1 - Thread.sleep(500) - } - } - - override def cancel(): Unit = running = false - }) - - // create a Kafka producer for Kafka 0.8.x - val kafkaProducer = new FlinkKafkaProducer08( - params.getRequired("topic"), - new SimpleStringSchema, - params.getProperties) - - // write data into Kafka - messageStream.addSink(kafkaProducer) - - env.execute("Write into Kafka example") - } - -}