[ 
https://issues.apache.org/jira/browse/FLINK-6539?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16019293#comment-16019293
 ] 

ASF GitHub Bot commented on FLINK-6539:
---------------------------------------

Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3911#discussion_r117691205
  
    --- Diff: 
flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/kafka/Kafka010Example.java
 ---
    @@ -17,49 +17,74 @@
     
     package org.apache.flink.streaming.examples.kafka;
     
    +import org.apache.flink.api.common.functions.MapFunction;
     import org.apache.flink.api.common.restartstrategy.RestartStrategies;
     import org.apache.flink.api.java.utils.ParameterTool;
     import org.apache.flink.streaming.api.datastream.DataStream;
     import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    -import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08;
    +import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
    +import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010;
     import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
     
     
     /**
    - * Read Strings from Kafka and print them to standard out.
    - * Note: On a cluster, DataStream.print() will print to the TaskManager's 
.out file!
    - *
    - * Please pass the following arguments to run the example:
    - *         --topic test --bootstrap.servers localhost:9092 
--zookeeper.connect localhost:2181 --group.id myconsumer
    + * An example that shows how to read from and write to Kafka. This will 
read String messages
    + * from the input topic, prefix them by a configured prefix and output to 
the output topic.
      *
    + * <p>Example usage:
    + *         --input-topic test-input --output-topic test-outpu 
--bootstrap.servers localhost:9092 --zookeeper.connect localhost:2181 
--group.id myconsumer
      */
    -public class ReadFromKafka {
    +public class Kafka010Example {
     
        public static void main(String[] args) throws Exception {
                // parse input arguments
                final ParameterTool parameterTool = 
ParameterTool.fromArgs(args);
     
    -           if(parameterTool.getNumberOfParameters() < 4) {
    -                   System.out.println("Missing parameters!\nUsage: Kafka 
--topic <topic> " +
    -                                   "--bootstrap.servers <kafka brokers> 
--zookeeper.connect <zk quorum> --group.id <some id>");
    +           if (parameterTool.getNumberOfParameters() < 5) {
    +                   System.out.println("Missing parameters!\n" +
    +                                   "Usage: Kafka --input-topic <topic> 
--output-topic <topic> " +
    +                                   "--bootstrap.servers <kafka brokers> " +
    +                                   "--zookeeper.connect <zk quorum> 
--group.id <some id> [--prefix <prefix>]");
                        return;
                }
     
    +           String prefix = parameterTool.get("prefix", "PREFIX:");
    +
                StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
                env.getConfig().disableSysoutLogging();
                
env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 
10000));
    -           env.enableCheckpointing(5000); // create a checkpoint every 5 
seconds
    -           env.getConfig().setGlobalJobParameters(parameterTool); // make 
parameters available in the web interface
    +//         env.enableCheckpointing(5000); // create a checkpoint every 5 
seconds
    --- End diff --
    
    It wasn't working because of 
https://issues.apache.org/jira/browse/FLINK-6515, which is now fixed so will 
change that.


> Add automated end-to-end tests
> ------------------------------
>
>                 Key: FLINK-6539
>                 URL: https://issues.apache.org/jira/browse/FLINK-6539
>             Project: Flink
>          Issue Type: Improvement
>          Components: Tests
>            Reporter: Aljoscha Krettek
>            Assignee: Aljoscha Krettek
>
> We should add simple tests that exercise all the paths that a user would use 
> when starting a cluster and submitting a program. Preferably with a simple 
> batch program and a streaming program that uses Kafka.
> This would have catched some of the bugs that we now discovered right before 
> the release.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to