[ https://issues.apache.org/jira/browse/FLINK-6539?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16019293#comment-16019293 ]
ASF GitHub Bot commented on FLINK-6539: --------------------------------------- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/3911#discussion_r117691205 --- Diff: flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/kafka/Kafka010Example.java --- @@ -17,49 +17,74 @@ package org.apache.flink.streaming.examples.kafka; +import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08; +import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010; +import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010; import org.apache.flink.streaming.util.serialization.SimpleStringSchema; /** - * Read Strings from Kafka and print them to standard out. - * Note: On a cluster, DataStream.print() will print to the TaskManager's .out file! - * - * Please pass the following arguments to run the example: - * --topic test --bootstrap.servers localhost:9092 --zookeeper.connect localhost:2181 --group.id myconsumer + * An example that shows how to read from and write to Kafka. This will read String messages + * from the input topic, prefix them by a configured prefix and output to the output topic. * + * <p>Example usage: + * --input-topic test-input --output-topic test-outpu --bootstrap.servers localhost:9092 --zookeeper.connect localhost:2181 --group.id myconsumer */ -public class ReadFromKafka { +public class Kafka010Example { public static void main(String[] args) throws Exception { // parse input arguments final ParameterTool parameterTool = ParameterTool.fromArgs(args); - if(parameterTool.getNumberOfParameters() < 4) { - System.out.println("Missing parameters!\nUsage: Kafka --topic <topic> " + - "--bootstrap.servers <kafka brokers> --zookeeper.connect <zk quorum> --group.id <some id>"); + if (parameterTool.getNumberOfParameters() < 5) { + System.out.println("Missing parameters!\n" + + "Usage: Kafka --input-topic <topic> --output-topic <topic> " + + "--bootstrap.servers <kafka brokers> " + + "--zookeeper.connect <zk quorum> --group.id <some id> [--prefix <prefix>]"); return; } + String prefix = parameterTool.get("prefix", "PREFIX:"); + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.getConfig().disableSysoutLogging(); env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 10000)); - env.enableCheckpointing(5000); // create a checkpoint every 5 seconds - env.getConfig().setGlobalJobParameters(parameterTool); // make parameters available in the web interface +// env.enableCheckpointing(5000); // create a checkpoint every 5 seconds --- End diff -- It wasn't working because of https://issues.apache.org/jira/browse/FLINK-6515, which is now fixed so will change that. > Add automated end-to-end tests > ------------------------------ > > Key: FLINK-6539 > URL: https://issues.apache.org/jira/browse/FLINK-6539 > Project: Flink > Issue Type: Improvement > Components: Tests > Reporter: Aljoscha Krettek > Assignee: Aljoscha Krettek > > We should add simple tests that exercise all the paths that a user would use > when starting a cluster and submitting a program. Preferably with a simple > batch program and a streaming program that uses Kafka. > This would have catched some of the bugs that we now discovered right before > the release. -- This message was sent by Atlassian JIRA (v6.3.15#6346)