[ https://issues.apache.org/jira/browse/FLINK-2585?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Boyang Jerry Peng updated FLINK-2585: ------------------------------------- Description: I tried running the KafkaConsumerExample with that is subscribing to a command line producer of kafka but the KafkaConsumerExample topology was not receiving any data from Kafka. Then I wrote my own topology that uses Kafka as a source but it didn't work as well. The topologies would run but receive not data. Can someone help me with this problem? Kafka console producer I am running: bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test The flink code I am running: public class KafkaDataProcessor { private static int port; private static String hostname; private static String topic; private static final Logger LOG = LoggerFactory.getLogger(KafkaDataProcessor.class); public static void main(String[] args) { if (!parseParameters(args)) { return; } System.out.println("Start listening for data on: " + hostname + ":" + port + " for topic: " + topic); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream<Tuple2<String, Integer>> dataStream = env .addSource(new KafkaSource(hostname + ":" + port, topic, "test-consumer-group", new SimpleStringSchema(), 200L), "Kafka source").setParallelism(2) .flatMap(new Splitter()).setParallelism(2) .groupBy(0) .sum(1).setParallelism(2); dataStream.print().setParallelism(2); try { env.execute("kafka processor"); } catch (Exception e) { e.printStackTrace(); } } public static class Splitter implements FlatMapFunction<String, Tuple2<String, Integer>> { @Override public void flatMap(String sentence, Collector<Tuple2<String, Integer>> out) throws Exception { for (String word : sentence.split(" ")) { System.out.println("word: " + word); LOG.info("word: {}", word); out.collect(new Tuple2<String, Integer>(word, 1)); } } } private static boolean parseParameters(String[] args) { if (args.length > 0) { if (args.length == 3) { hostname = args[0]; port = Integer.parseInt(args[1]); topic = args[2]; } else { System.err.println("Usage: KafkaDataProcessor <hostname> <Port> <topic>"); return false; } } else { System.out.println("Executing KafkaDataProcessor example with built-in default data."); System.out.println(" Provide Hostname and Port to read input data from."); System.out.println(" Usage: KafkaDataProcessor <Hostname> <Port> <topic>"); return false; } return true; } } was: I tried running the KafkaConsumerExample with that is subscribing to a command line producer of kafka but the KafkaConsumerExample topology was not receiving any data from Kafka. Then I wrote my own topology that uses Kafka as a source but it didn't work as well. The topologies would run but receive not data. Can someone help me with this problem? The code I am running: > KafkaSource not working > ----------------------- > > Key: FLINK-2585 > URL: https://issues.apache.org/jira/browse/FLINK-2585 > Project: Flink > Issue Type: Bug > Reporter: Boyang Jerry Peng > > I tried running the KafkaConsumerExample with that is subscribing to a > command line producer of kafka but the KafkaConsumerExample topology was not > receiving any data from Kafka. Then I wrote my own topology that uses Kafka > as a source but it didn't work as well. The topologies would run but receive > not data. Can someone help me with this problem? > Kafka console producer I am running: > bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test > The flink code I am running: > public class KafkaDataProcessor { > private static int port; > private static String hostname; > private static String topic; > private static final Logger LOG = > LoggerFactory.getLogger(KafkaDataProcessor.class); > public static void main(String[] args) { > if (!parseParameters(args)) { > return; > } > System.out.println("Start listening for data on: " + hostname + > ":" + port + " for topic: " + topic); > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > DataStream<Tuple2<String, Integer>> dataStream = env > .addSource(new > KafkaSource(hostname + ":" + port, topic, "test-consumer-group", new > SimpleStringSchema(), 200L), "Kafka source").setParallelism(2) > > .flatMap(new Splitter()).setParallelism(2) > > .groupBy(0) > > .sum(1).setParallelism(2); > dataStream.print().setParallelism(2); > try { > env.execute("kafka processor"); > } catch (Exception e) { > e.printStackTrace(); > } > } > public static class Splitter implements FlatMapFunction<String, > Tuple2<String, Integer>> { > @Override > public void flatMap(String sentence, Collector<Tuple2<String, > Integer>> out) throws Exception { > for (String word : sentence.split(" ")) { > System.out.println("word: " + word); > LOG.info("word: {}", word); > out.collect(new Tuple2<String, Integer>(word, > 1)); > } > } > } > private static boolean parseParameters(String[] args) { > if (args.length > 0) { > if (args.length == 3) { > hostname = args[0]; > port = Integer.parseInt(args[1]); > topic = args[2]; > } else { > System.err.println("Usage: KafkaDataProcessor > <hostname> <Port> <topic>"); > return false; > } > } else { > System.out.println("Executing KafkaDataProcessor > example with built-in default data."); > System.out.println(" Provide Hostname and Port to read > input data from."); > System.out.println(" Usage: KafkaDataProcessor > <Hostname> <Port> <topic>"); > return false; > } > return true; > } > } -- This message was sent by Atlassian JIRA (v6.3.4#6332)