[ 
https://issues.apache.org/jira/browse/FLINK-2585?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Boyang Jerry Peng updated FLINK-2585:
-------------------------------------
    Description: 
I tried running the KafkaConsumerExample with that is subscribing to a console 
producer of kafka but the KafkaConsumerExample topology was not receiving any 
data from Kafka.  Then I wrote my own topology that uses Kafka as a source but 
it didn't work as well.  The topologies would run but receive not data.  If I 
run a console consumer that subscibes to the topic of the console producer, the 
console consumer receives data from the producer which indicates the producer 
is working correctly.  Can someone help me with this problem?  

Kafka console producer I am running:

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test 

The flink code I am running:

public class KafkaDataProcessor {
        private static int port;
        private static String hostname;
        private static String topic;
        private static final Logger LOG = 
LoggerFactory.getLogger(KafkaDataProcessor.class);

        public static void main(String[] args) {
                if (!parseParameters(args)) {
                        return;
                }
                System.out.println("Start listening for data on: " + hostname + 
":" + port + " for topic: " + topic);
                StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();


                DataStream<Tuple2<String, Integer>> dataStream = env
                                                .addSource(new 
KafkaSource(hostname + ":" + port, topic, "test-consumer-group", new 
SimpleStringSchema(), 200L), "Kafka source").setParallelism(2)
                                                                                
.flatMap(new Splitter()).setParallelism(2)
                                                                                
.groupBy(0)
                                                                                
.sum(1).setParallelism(2);


                dataStream.print().setParallelism(2);



                try {
                        env.execute("kafka processor");
                } catch (Exception e) {
                        e.printStackTrace();
                }

        }

        public static class Splitter implements FlatMapFunction<String, 
Tuple2<String, Integer>> {
                @Override
                public void flatMap(String sentence, Collector<Tuple2<String, 
Integer>> out) throws Exception {
                        for (String word : sentence.split(" ")) {
                                System.out.println("word: " + word);
                                LOG.info("word: {}", word);
                                out.collect(new Tuple2<String, Integer>(word, 
1));
                        }
                }
        }

        private static boolean parseParameters(String[] args) {

                if (args.length > 0) {
                        if (args.length == 3) {
                                hostname = args[0];
                                port = Integer.parseInt(args[1]);
                                topic = args[2];
                        } else {
                                System.err.println("Usage: KafkaDataProcessor 
<hostname> <Port> <topic>");
                                return false;
                        }
                } else {
                        System.out.println("Executing KafkaDataProcessor 
example with built-in default data.");
                        System.out.println("  Provide Hostname and Port to read 
input data from.");
                        System.out.println("  Usage: KafkaDataProcessor 
<Hostname> <Port> <topic>");
                        return false;
                }
                return true;
        }
}

  was:
I tried running the KafkaConsumerExample with that is subscribing to a command 
line producer of kafka but the KafkaConsumerExample topology was not receiving 
any data from Kafka.  Then I wrote my own topology that uses Kafka as a source 
but it didn't work as well.  The topologies would run but receive not data.  
Can someone help me with this problem?  

Kafka console producer I am running:

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test 

The flink code I am running:

public class KafkaDataProcessor {
        private static int port;
        private static String hostname;
        private static String topic;
        private static final Logger LOG = 
LoggerFactory.getLogger(KafkaDataProcessor.class);

        public static void main(String[] args) {
                if (!parseParameters(args)) {
                        return;
                }
                System.out.println("Start listening for data on: " + hostname + 
":" + port + " for topic: " + topic);
                StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();


                DataStream<Tuple2<String, Integer>> dataStream = env
                                                .addSource(new 
KafkaSource(hostname + ":" + port, topic, "test-consumer-group", new 
SimpleStringSchema(), 200L), "Kafka source").setParallelism(2)
                                                                                
.flatMap(new Splitter()).setParallelism(2)
                                                                                
.groupBy(0)
                                                                                
.sum(1).setParallelism(2);


                dataStream.print().setParallelism(2);



                try {
                        env.execute("kafka processor");
                } catch (Exception e) {
                        e.printStackTrace();
                }

        }

        public static class Splitter implements FlatMapFunction<String, 
Tuple2<String, Integer>> {
                @Override
                public void flatMap(String sentence, Collector<Tuple2<String, 
Integer>> out) throws Exception {
                        for (String word : sentence.split(" ")) {
                                System.out.println("word: " + word);
                                LOG.info("word: {}", word);
                                out.collect(new Tuple2<String, Integer>(word, 
1));
                        }
                }
        }

        private static boolean parseParameters(String[] args) {

                if (args.length > 0) {
                        if (args.length == 3) {
                                hostname = args[0];
                                port = Integer.parseInt(args[1]);
                                topic = args[2];
                        } else {
                                System.err.println("Usage: KafkaDataProcessor 
<hostname> <Port> <topic>");
                                return false;
                        }
                } else {
                        System.out.println("Executing KafkaDataProcessor 
example with built-in default data.");
                        System.out.println("  Provide Hostname and Port to read 
input data from.");
                        System.out.println("  Usage: KafkaDataProcessor 
<Hostname> <Port> <topic>");
                        return false;
                }
                return true;
        }
}


> KafkaSource not working
> -----------------------
>
>                 Key: FLINK-2585
>                 URL: https://issues.apache.org/jira/browse/FLINK-2585
>             Project: Flink
>          Issue Type: Bug
>            Reporter: Boyang Jerry Peng
>
> I tried running the KafkaConsumerExample with that is subscribing to a 
> console producer of kafka but the KafkaConsumerExample topology was not 
> receiving any data from Kafka.  Then I wrote my own topology that uses Kafka 
> as a source but it didn't work as well.  The topologies would run but receive 
> not data.  If I run a console consumer that subscibes to the topic of the 
> console producer, the console consumer receives data from the producer which 
> indicates the producer is working correctly.  Can someone help me with this 
> problem?  
> Kafka console producer I am running:
> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test 
> The flink code I am running:
> public class KafkaDataProcessor {
>       private static int port;
>       private static String hostname;
>       private static String topic;
>       private static final Logger LOG = 
> LoggerFactory.getLogger(KafkaDataProcessor.class);
>       public static void main(String[] args) {
>               if (!parseParameters(args)) {
>                       return;
>               }
>               System.out.println("Start listening for data on: " + hostname + 
> ":" + port + " for topic: " + topic);
>               StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>               DataStream<Tuple2<String, Integer>> dataStream = env
>                                               .addSource(new 
> KafkaSource(hostname + ":" + port, topic, "test-consumer-group", new 
> SimpleStringSchema(), 200L), "Kafka source").setParallelism(2)
>                                                                               
> .flatMap(new Splitter()).setParallelism(2)
>                                                                               
> .groupBy(0)
>                                                                               
> .sum(1).setParallelism(2);
>               dataStream.print().setParallelism(2);
>               try {
>                       env.execute("kafka processor");
>               } catch (Exception e) {
>                       e.printStackTrace();
>               }
>       }
>       public static class Splitter implements FlatMapFunction<String, 
> Tuple2<String, Integer>> {
>               @Override
>               public void flatMap(String sentence, Collector<Tuple2<String, 
> Integer>> out) throws Exception {
>                       for (String word : sentence.split(" ")) {
>                               System.out.println("word: " + word);
>                               LOG.info("word: {}", word);
>                               out.collect(new Tuple2<String, Integer>(word, 
> 1));
>                       }
>               }
>       }
>       private static boolean parseParameters(String[] args) {
>               if (args.length > 0) {
>                       if (args.length == 3) {
>                               hostname = args[0];
>                               port = Integer.parseInt(args[1]);
>                               topic = args[2];
>                       } else {
>                               System.err.println("Usage: KafkaDataProcessor 
> <hostname> <Port> <topic>");
>                               return false;
>                       }
>               } else {
>                       System.out.println("Executing KafkaDataProcessor 
> example with built-in default data.");
>                       System.out.println("  Provide Hostname and Port to read 
> input data from.");
>                       System.out.println("  Usage: KafkaDataProcessor 
> <Hostname> <Port> <topic>");
>                       return false;
>               }
>               return true;
>       }
> }



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to