[FLINK-6539] Combine ReadFromKafka/WriteIntoKafka into one Kafka010Example

This also updates the Kafka version we use in the examples module to
0.10.x.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c86f71be
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c86f71be
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c86f71be

Branch: refs/heads/master
Commit: c86f71be9d83290c60a8dd63afe92ef9dcebac9c
Parents: 4f1c764
Author: Aljoscha Krettek <aljoscha.kret...@gmail.com>
Authored: Tue May 9 14:46:39 2017 +0200
Committer: Aljoscha Krettek <aljoscha.kret...@gmail.com>
Committed: Mon Jul 24 11:18:06 2017 +0200

----------------------------------------------------------------------
 flink-examples/flink-examples-streaming/pom.xml |  8 +-
 .../examples/kafka/Kafka010Example.java         | 90 ++++++++++++++++++++
 .../streaming/examples/kafka/ReadFromKafka.java | 64 --------------
 .../examples/kafka/WriteIntoKafka.java          | 73 ----------------
 .../scala/examples/kafka/Kafka010Example.scala  | 88 +++++++++++++++++++
 .../scala/examples/kafka/ReadFromKafka.scala    | 73 ----------------
 .../scala/examples/kafka/WriteIntoKafka.scala   | 84 ------------------
 7 files changed, 182 insertions(+), 298 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/c86f71be/flink-examples/flink-examples-streaming/pom.xml
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-streaming/pom.xml 
b/flink-examples/flink-examples-streaming/pom.xml
index b023cd5..eba81d3 100644
--- a/flink-examples/flink-examples-streaming/pom.xml
+++ b/flink-examples/flink-examples-streaming/pom.xml
@@ -58,7 +58,7 @@ under the License.
 
                <dependency>
                        <groupId>org.apache.flink</groupId>
-                       
<artifactId>flink-connector-kafka-0.8_${scala.binary.version}</artifactId>
+                       
<artifactId>flink-connector-kafka-0.10_${scala.binary.version}</artifactId>
                        <version>${project.version}</version>
                </dependency>
 
@@ -488,7 +488,7 @@ under the License.
                                <artifactId>maven-shade-plugin</artifactId>
                                <executions>
                                        <execution>
-                                               <id>fat-jar-kafka-example</id>
+                                               
<id>fat-jar-kafka-010-example</id>
                                                <phase>package</phase>
                                                <goals>
                                                        <goal>shade</goal>
@@ -499,10 +499,10 @@ under the License.
                                                        
<createDependencyReducedPom>false</createDependencyReducedPom>
                                                        <transformers>
                                                                <transformer 
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
-                                                                       
<mainClass>org.apache.flink.streaming.examples.kafka.ReadFromKafka</mainClass>
+                                                                       
<mainClass>org.apache.flink.streaming.examples.kafka.Kafka010Example</mainClass>
                                                                </transformer>
                                                        </transformers>
-                                                       
<finalName>Kafka</finalName>
+                                                       
<finalName>Kafka010Example</finalName>
                                                        <!-- 
<outputFile>Kafka.jar</outputFile> -->
                                                        <filters>
                                                                <filter>

http://git-wip-us.apache.org/repos/asf/flink/blob/c86f71be/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/kafka/Kafka010Example.java
----------------------------------------------------------------------
diff --git 
a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/kafka/Kafka010Example.java
 
b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/kafka/Kafka010Example.java
new file mode 100644
index 0000000..b5abbc5
--- /dev/null
+++ 
b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/kafka/Kafka010Example.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.examples.kafka;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010;
+import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
+
+
+/**
+ * An example that shows how to read from and write to Kafka. This will read 
String messages
+ * from the input topic, prefix them by a configured prefix and output to the 
output topic.
+ *
+ * <p>Example usage:
+ *     --input-topic test-input --output-topic test-output --bootstrap.servers 
localhost:9092 --zookeeper.connect localhost:2181 --group.id myconsumer
+ */
+public class Kafka010Example {
+
+       public static void main(String[] args) throws Exception {
+               // parse input arguments
+               final ParameterTool parameterTool = 
ParameterTool.fromArgs(args);
+
+               if (parameterTool.getNumberOfParameters() < 5) {
+                       System.out.println("Missing parameters!\n" +
+                                       "Usage: Kafka --input-topic <topic> 
--output-topic <topic> " +
+                                       "--bootstrap.servers <kafka brokers> " +
+                                       "--zookeeper.connect <zk quorum> 
--group.id <some id> [--prefix <prefix>]");
+                       return;
+               }
+
+               String prefix = parameterTool.get("prefix", "PREFIX:");
+
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               env.getConfig().disableSysoutLogging();
+               
env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 
10000));
+               env.enableCheckpointing(5000); // create a checkpoint every 5 
seconds
+               env.getConfig().setGlobalJobParameters(parameterTool); // make 
parameters available in the web interface
+
+               // make parameters available in the web interface
+               env.getConfig().setGlobalJobParameters(parameterTool);
+
+               DataStream<String> input = env
+                               .addSource(new FlinkKafkaConsumer010<>(
+                                               
parameterTool.getRequired("input-topic"),
+                                               new SimpleStringSchema(),
+                                               parameterTool.getProperties()))
+                               .map(new PrefixingMapper(prefix));
+
+               input.addSink(
+                               new FlinkKafkaProducer010<>(
+                                               
parameterTool.getRequired("output-topic"),
+                                               new SimpleStringSchema(),
+                                               parameterTool.getProperties()));
+
+               env.execute("Kafka 0.10 Example");
+       }
+
+       private static class PrefixingMapper implements MapFunction<String, 
String> {
+               private final String prefix;
+
+               public PrefixingMapper(String prefix) {
+                       this.prefix = prefix;
+               }
+
+               @Override
+               public String map(String value) throws Exception {
+                       return prefix + value;
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c86f71be/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/kafka/ReadFromKafka.java
----------------------------------------------------------------------
diff --git 
a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/kafka/ReadFromKafka.java
 
b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/kafka/ReadFromKafka.java
deleted file mode 100644
index f9cf42b..0000000
--- 
a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/kafka/ReadFromKafka.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.examples.kafka;
-
-import org.apache.flink.api.common.restartstrategy.RestartStrategies;
-import org.apache.flink.api.java.utils.ParameterTool;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08;
-import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
-
-
-/**
- * Read Strings from Kafka and print them to standard out.
- * Note: On a cluster, DataStream.print() will print to the TaskManager's .out 
file!
- *
- * <p>Please pass the following arguments to run the example:
- *     --topic test --bootstrap.servers localhost:9092 --zookeeper.connect 
localhost:2181 --group.id myconsumer
- */
-public class ReadFromKafka {
-
-       public static void main(String[] args) throws Exception {
-               // parse input arguments
-               final ParameterTool parameterTool = 
ParameterTool.fromArgs(args);
-
-               if (parameterTool.getNumberOfParameters() < 4) {
-                       System.out.println("Missing parameters!\nUsage: Kafka 
--topic <topic> " +
-                                       "--bootstrap.servers <kafka brokers> 
--zookeeper.connect <zk quorum> --group.id <some id>");
-                       return;
-               }
-
-               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-               env.getConfig().disableSysoutLogging();
-               
env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 
10000));
-               env.enableCheckpointing(5000); // create a checkpoint every 5 
seconds
-               env.getConfig().setGlobalJobParameters(parameterTool); // make 
parameters available in the web interface
-
-               DataStream<String> messageStream = env
-                               .addSource(new FlinkKafkaConsumer08<>(
-                                               
parameterTool.getRequired("topic"),
-                                               new SimpleStringSchema(),
-                                               parameterTool.getProperties()));
-
-               // write kafka stream to standard out.
-               messageStream.print();
-
-               env.execute("Read from Kafka example");
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/c86f71be/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/kafka/WriteIntoKafka.java
----------------------------------------------------------------------
diff --git 
a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/kafka/WriteIntoKafka.java
 
b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/kafka/WriteIntoKafka.java
deleted file mode 100644
index f9b4656..0000000
--- 
a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/kafka/WriteIntoKafka.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.examples.kafka;
-
-import org.apache.flink.api.common.restartstrategy.RestartStrategies;
-import org.apache.flink.api.java.utils.ParameterTool;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer08;
-import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
-
-/**
- * Generate a String every 500 ms and write it into a Kafka topic.
- *
- * <p>Please pass the following arguments to run the example:
- *     --topic test --bootstrap.servers localhost:9092
- */
-public class WriteIntoKafka {
-
-       public static void main(String[] args) throws Exception {
-               ParameterTool parameterTool = ParameterTool.fromArgs(args);
-               if (parameterTool.getNumberOfParameters() < 2) {
-                       System.out.println("Missing parameters!");
-                       System.out.println("Usage: Kafka --topic <topic> 
--bootstrap.servers <kafka brokers>");
-                       return;
-               }
-
-               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-               env.getConfig().disableSysoutLogging();
-               
env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 
10000));
-
-               // very simple data generator
-               DataStream<String> messageStream = env.addSource(new 
SourceFunction<String>() {
-                       private static final long serialVersionUID = 
6369260445318862378L;
-                       public boolean running = true;
-
-                       @Override
-                       public void run(SourceContext<String> ctx) throws 
Exception {
-                               long i = 0;
-                               while (this.running) {
-                                       ctx.collect("Element - " + i++);
-                                       Thread.sleep(500);
-                               }
-                       }
-
-                       @Override
-                       public void cancel() {
-                               running = false;
-                       }
-               });
-
-               // write data into Kafka
-               messageStream.addSink(new 
FlinkKafkaProducer08<>(parameterTool.getRequired("topic"), new 
SimpleStringSchema(), parameterTool.getProperties()));
-
-               env.execute("Write into Kafka example");
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/c86f71be/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/kafka/Kafka010Example.scala
----------------------------------------------------------------------
diff --git 
a/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/kafka/Kafka010Example.scala
 
b/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/kafka/Kafka010Example.scala
new file mode 100644
index 0000000..2a52811
--- /dev/null
+++ 
b/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/kafka/Kafka010Example.scala
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.scala.examples.kafka
+
+import org.apache.flink.api.common.restartstrategy.RestartStrategies
+import org.apache.flink.api.java.utils.ParameterTool
+import org.apache.flink.streaming.api.scala._
+import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer010, 
FlinkKafkaProducer010}
+import org.apache.flink.streaming.util.serialization.SimpleStringSchema
+
+/**
+ * An example that shows how to read from and write to Kafka. This will read 
String messages
+ * from the input topic, prefix them by a configured prefix and output to the 
output topic.
+ *
+ * Please pass the following arguments to run the example:
+ * {{{
+ * --input-topic test-input
+ * --output-topic test-output
+ * --bootstrap.servers localhost:9092
+ * --zookeeper.connect localhost:2181
+ * --group.id myconsumer
+ * }}}
+ */
+object Kafka010Example {
+
+  def main(args: Array[String]): Unit = {
+
+    // parse input arguments
+    val params = ParameterTool.fromArgs(args)
+
+    if (params.getNumberOfParameters < 4) {
+      println("Missing parameters!\n"
+        + "Usage: Kafka --input-topic <topic> --output-topic <topic> "
+        + "--bootstrap.servers <kafka brokers> "
+        + "--zookeeper.connect <zk quorum> --group.id <some id> [--prefix 
<prefix>]")
+      return
+    }
+
+    val prefix = params.get("prefix", "PREFIX:")
+
+
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.getConfig.disableSysoutLogging
+    env.getConfig.setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 
10000))
+    // create a checkpoint every 5 seconds
+    env.enableCheckpointing(5000)
+    // make parameters available in the web interface
+    env.getConfig.setGlobalJobParameters(params)
+
+    // create a Kafka streaming source consumer for Kafka 0.10.x
+    val kafkaConsumer = new FlinkKafkaConsumer010(
+      params.getRequired("input-topic"),
+      new SimpleStringSchema,
+      params.getProperties)
+
+    val messageStream = env
+      .addSource(kafkaConsumer)
+      .map(in => prefix + in)
+
+    // create a Kafka producer for Kafka 0.10.x
+    val kafkaProducer = new FlinkKafkaProducer010(
+      params.getRequired("output-topic"),
+      new SimpleStringSchema,
+      params.getProperties)
+
+    // write data into Kafka
+    messageStream.addSink(kafkaProducer)
+
+    env.execute("Kafka 0.10 Example")
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c86f71be/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/kafka/ReadFromKafka.scala
----------------------------------------------------------------------
diff --git 
a/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/kafka/ReadFromKafka.scala
 
b/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/kafka/ReadFromKafka.scala
deleted file mode 100644
index 3127ab7..0000000
--- 
a/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/kafka/ReadFromKafka.scala
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.scala.examples.kafka
-
-import org.apache.flink.api.common.restartstrategy.RestartStrategies
-import org.apache.flink.api.java.utils.ParameterTool
-import org.apache.flink.streaming.api.scala._
-import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08
-import org.apache.flink.streaming.util.serialization.SimpleStringSchema
-
-/**
- * Read Strings from Kafka and print them to standard out.
- * Note: On a cluster, DataStream.print() will print to the TaskManager's .out 
file!
- *
- * Please pass the following arguments to run the example:
- * {{{
- * --topic test
- * --bootstrap.servers localhost:9092
- * --zookeeper.connect localhost:2181
- * --group.id myconsumer
- * }}}
- */
-object ReadFromKafka {
-
-  def main(args: Array[String]): Unit = {
-
-    // parse input arguments
-    val params = ParameterTool.fromArgs(args)
-
-    if (params.getNumberOfParameters < 4) {
-      println("Missing parameters!\nUsage: Kafka --topic <topic> " +
-        "--bootstrap.servers <kafka brokers> --zookeeper.connect <zk quorum> 
--group.id <some id>")
-      return
-    }
-
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    env.getConfig.disableSysoutLogging
-    env.getConfig.setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 
10000))
-    // create a checkpoint every 5 seconds
-    env.enableCheckpointing(5000)
-    // make parameters available in the web interface
-    env.getConfig.setGlobalJobParameters(params)
-
-    // create a Kafka streaming source consumer for Kafka 0.8.x
-    val kafkaConsumer = new FlinkKafkaConsumer08(
-      params.getRequired("topic"),
-      new SimpleStringSchema,
-      params.getProperties)
-    val messageStream = env.addSource(kafkaConsumer)
-
-    // write kafka stream to standard out.
-    messageStream.print()
-
-    env.execute("Read from Kafka example")
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/c86f71be/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/kafka/WriteIntoKafka.scala
----------------------------------------------------------------------
diff --git 
a/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/kafka/WriteIntoKafka.scala
 
b/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/kafka/WriteIntoKafka.scala
deleted file mode 100644
index e34083a..0000000
--- 
a/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/kafka/WriteIntoKafka.scala
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.scala.examples.kafka
-
-import org.apache.flink.api.common.restartstrategy.RestartStrategies
-import org.apache.flink.api.java.utils.ParameterTool
-import org.apache.flink.streaming.api.functions.source.SourceFunction
-import 
org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
-import org.apache.flink.streaming.api.scala._
-import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer08
-import org.apache.flink.streaming.util.serialization.SimpleStringSchema
-
-/**
- * Generate a String every 500 ms and write it into a Kafka topic
- *
- * Please pass the following arguments to run the example:
- * {{{
- * --topic test
- * --bootstrap.servers
- * localhost:9092
- * }}}
- */
-object WriteIntoKafka {
-
-  def main(args: Array[String]): Unit = {
-
-    // parse input arguments
-    val params = ParameterTool.fromArgs(args)
-
-    if (params.getNumberOfParameters < 2) {
-      println("Missing parameters!")
-      println("Usage: Kafka --topic <topic> --bootstrap.servers <kafka 
brokers>")
-      return
-    }
-
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    env.getConfig.disableSysoutLogging
-    env.getConfig.setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 
10000))
-
-    // very simple data generator
-    val messageStream: DataStream[String] = env.addSource(new 
SourceFunction[String]() {
-      var running = true
-
-      override def run(ctx: SourceContext[String]): Unit = {
-        var i = 0L
-        while (this.running) {
-          ctx.collect(s"Element - ${i}")
-          i += 1
-          Thread.sleep(500)
-        }
-      }
-
-      override def cancel(): Unit = running = false
-    })
-
-    // create a Kafka producer for Kafka 0.8.x
-    val kafkaProducer = new FlinkKafkaProducer08(
-      params.getRequired("topic"),
-      new SimpleStringSchema,
-      params.getProperties)
-
-    // write data into Kafka
-    messageStream.addSink(kafkaProducer)
-
-    env.execute("Write into Kafka example")
-  }
-
-}

Reply via email to