[
https://issues.apache.org/jira/browse/FLINK-2585?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Robert Metzger resolved FLINK-2585.
-----------------------------------
Resolution: Not A Problem
> KafkaSource not working
> -----------------------
>
> Key: FLINK-2585
> URL: https://issues.apache.org/jira/browse/FLINK-2585
> Project: Flink
> Issue Type: Bug
> Reporter: Boyang Jerry Peng
>
> I tried running the KafkaConsumerExample with that is subscribing to a
> console producer of kafka but the KafkaConsumerExample topology was not
> receiving any data from Kafka. Then I wrote my own topology that uses Kafka
> as a source but it didn't work as well. The topologies would run but receive
> not data. If I run a console consumer that subscibes to the topic of the
> console producer, the console consumer receives data from the producer which
> indicates the producer is working correctly. Can someone help me with this
> problem?
> Kafka console producer I am running:
> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
> The flink code I am running:
> {code}
> public class KafkaDataProcessor {
> private static int port;
> private static String hostname;
> private static String topic;
> private static final Logger LOG =
> LoggerFactory.getLogger(KafkaDataProcessor.class);
> public static void main(String[] args) {
> if (!parseParameters(args)) {
> return;
> }
> System.out.println("Start listening for data on: " + hostname +
> ":" + port + " for topic: " + topic);
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> DataStream<Tuple2<String, Integer>> dataStream = env
> .addSource(new
> KafkaSource(hostname + ":" + port, topic, "test-consumer-group", new
> SimpleStringSchema(), 200L), "Kafka source").setParallelism(2)
>
> .flatMap(new Splitter()).setParallelism(2)
>
> .groupBy(0)
>
> .sum(1).setParallelism(2);
> dataStream.print().setParallelism(2);
> try {
> env.execute("kafka processor");
> } catch (Exception e) {
> e.printStackTrace();
> }
> }
> public static class Splitter implements FlatMapFunction<String,
> Tuple2<String, Integer>> {
> @Override
> public void flatMap(String sentence, Collector<Tuple2<String,
> Integer>> out) throws Exception {
> for (String word : sentence.split(" ")) {
> System.out.println("word: " + word);
> LOG.info("word: {}", word);
> out.collect(new Tuple2<String, Integer>(word,
> 1));
> }
> }
> }
> private static boolean parseParameters(String[] args) {
> if (args.length > 0) {
> if (args.length == 3) {
> hostname = args[0];
> port = Integer.parseInt(args[1]);
> topic = args[2];
> } else {
> System.err.println("Usage: KafkaDataProcessor
> <hostname> <Port> <topic>");
> return false;
> }
> } else {
> System.out.println("Executing KafkaDataProcessor
> example with built-in default data.");
> System.out.println(" Provide Hostname and Port to read
> input data from.");
> System.out.println(" Usage: KafkaDataProcessor
> <Hostname> <Port> <topic>");
> return false;
> }
> return true;
> }
> }
> {code}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)