[ https://issues.apache.org/jira/browse/FLINK-2585?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14716917#comment-14716917 ]
Stephan Ewen commented on FLINK-2585: ------------------------------------- The {{print()}} method in the batch API prints on the client, but the {{print()}} method in the streaming API prints on the workers. We are exploring a few ways to have a stream of the data back to the client. > KafkaSource not working > ----------------------- > > Key: FLINK-2585 > URL: https://issues.apache.org/jira/browse/FLINK-2585 > Project: Flink > Issue Type: Bug > Reporter: Boyang Jerry Peng > > I tried running the KafkaConsumerExample with that is subscribing to a > console producer of kafka but the KafkaConsumerExample topology was not > receiving any data from Kafka. Then I wrote my own topology that uses Kafka > as a source but it didn't work as well. The topologies would run but receive > not data. If I run a console consumer that subscibes to the topic of the > console producer, the console consumer receives data from the producer which > indicates the producer is working correctly. Can someone help me with this > problem? > Kafka console producer I am running: > bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test > The flink code I am running: > {code} > public class KafkaDataProcessor { > private static int port; > private static String hostname; > private static String topic; > private static final Logger LOG = > LoggerFactory.getLogger(KafkaDataProcessor.class); > public static void main(String[] args) { > if (!parseParameters(args)) { > return; > } > System.out.println("Start listening for data on: " + hostname + > ":" + port + " for topic: " + topic); > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > DataStream<Tuple2<String, Integer>> dataStream = env > .addSource(new > KafkaSource(hostname + ":" + port, topic, "test-consumer-group", new > SimpleStringSchema(), 200L), "Kafka source").setParallelism(2) > > .flatMap(new Splitter()).setParallelism(2) > > .groupBy(0) > > .sum(1).setParallelism(2); > dataStream.print().setParallelism(2); > try { > env.execute("kafka processor"); > } catch (Exception e) { > e.printStackTrace(); > } > } > public static class Splitter implements FlatMapFunction<String, > Tuple2<String, Integer>> { > @Override > public void flatMap(String sentence, Collector<Tuple2<String, > Integer>> out) throws Exception { > for (String word : sentence.split(" ")) { > System.out.println("word: " + word); > LOG.info("word: {}", word); > out.collect(new Tuple2<String, Integer>(word, > 1)); > } > } > } > private static boolean parseParameters(String[] args) { > if (args.length > 0) { > if (args.length == 3) { > hostname = args[0]; > port = Integer.parseInt(args[1]); > topic = args[2]; > } else { > System.err.println("Usage: KafkaDataProcessor > <hostname> <Port> <topic>"); > return false; > } > } else { > System.out.println("Executing KafkaDataProcessor > example with built-in default data."); > System.out.println(" Provide Hostname and Port to read > input data from."); > System.out.println(" Usage: KafkaDataProcessor > <Hostname> <Port> <topic>"); > return false; > } > return true; > } > } > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)