[
https://issues.apache.org/jira/browse/FLINK-2585?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Boyang Jerry Peng updated FLINK-2585:
-------------------------------------
Description:
I tried running the KafkaConsumerExample with that is subscribing to a command
line producer of kafka but the KafkaConsumerExample topology was not receiving
any data from Kafka. Then I wrote my own topology that uses Kafka as a source
but it didn't work as well. The topologies would run but receive not data.
Can someone help me with this problem?
Kafka console producer I am running:
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
The flink code I am running:
public class KafkaDataProcessor {
private static int port;
private static String hostname;
private static String topic;
private static final Logger LOG =
LoggerFactory.getLogger(KafkaDataProcessor.class);
public static void main(String[] args) {
if (!parseParameters(args)) {
return;
}
System.out.println("Start listening for data on: " + hostname +
":" + port + " for topic: " + topic);
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Tuple2<String, Integer>> dataStream = env
.addSource(new
KafkaSource(hostname + ":" + port, topic, "test-consumer-group", new
SimpleStringSchema(), 200L), "Kafka source").setParallelism(2)
.flatMap(new Splitter()).setParallelism(2)
.groupBy(0)
.sum(1).setParallelism(2);
dataStream.print().setParallelism(2);
try {
env.execute("kafka processor");
} catch (Exception e) {
e.printStackTrace();
}
}
public static class Splitter implements FlatMapFunction<String,
Tuple2<String, Integer>> {
@Override
public void flatMap(String sentence, Collector<Tuple2<String,
Integer>> out) throws Exception {
for (String word : sentence.split(" ")) {
System.out.println("word: " + word);
LOG.info("word: {}", word);
out.collect(new Tuple2<String, Integer>(word,
1));
}
}
}
private static boolean parseParameters(String[] args) {
if (args.length > 0) {
if (args.length == 3) {
hostname = args[0];
port = Integer.parseInt(args[1]);
topic = args[2];
} else {
System.err.println("Usage: KafkaDataProcessor
<hostname> <Port> <topic>");
return false;
}
} else {
System.out.println("Executing KafkaDataProcessor
example with built-in default data.");
System.out.println(" Provide Hostname and Port to read
input data from.");
System.out.println(" Usage: KafkaDataProcessor
<Hostname> <Port> <topic>");
return false;
}
return true;
}
}
was:
I tried running the KafkaConsumerExample with that is subscribing to a command
line producer of kafka but the KafkaConsumerExample topology was not receiving
any data from Kafka. Then I wrote my own topology that uses Kafka as a source
but it didn't work as well. The topologies would run but receive not data.
Can someone help me with this problem? The code I am running:
> KafkaSource not working
> -----------------------
>
> Key: FLINK-2585
> URL: https://issues.apache.org/jira/browse/FLINK-2585
> Project: Flink
> Issue Type: Bug
> Reporter: Boyang Jerry Peng
>
> I tried running the KafkaConsumerExample with that is subscribing to a
> command line producer of kafka but the KafkaConsumerExample topology was not
> receiving any data from Kafka. Then I wrote my own topology that uses Kafka
> as a source but it didn't work as well. The topologies would run but receive
> not data. Can someone help me with this problem?
> Kafka console producer I am running:
> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
> The flink code I am running:
> public class KafkaDataProcessor {
> private static int port;
> private static String hostname;
> private static String topic;
> private static final Logger LOG =
> LoggerFactory.getLogger(KafkaDataProcessor.class);
> public static void main(String[] args) {
> if (!parseParameters(args)) {
> return;
> }
> System.out.println("Start listening for data on: " + hostname +
> ":" + port + " for topic: " + topic);
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> DataStream<Tuple2<String, Integer>> dataStream = env
> .addSource(new
> KafkaSource(hostname + ":" + port, topic, "test-consumer-group", new
> SimpleStringSchema(), 200L), "Kafka source").setParallelism(2)
>
> .flatMap(new Splitter()).setParallelism(2)
>
> .groupBy(0)
>
> .sum(1).setParallelism(2);
> dataStream.print().setParallelism(2);
> try {
> env.execute("kafka processor");
> } catch (Exception e) {
> e.printStackTrace();
> }
> }
> public static class Splitter implements FlatMapFunction<String,
> Tuple2<String, Integer>> {
> @Override
> public void flatMap(String sentence, Collector<Tuple2<String,
> Integer>> out) throws Exception {
> for (String word : sentence.split(" ")) {
> System.out.println("word: " + word);
> LOG.info("word: {}", word);
> out.collect(new Tuple2<String, Integer>(word,
> 1));
> }
> }
> }
> private static boolean parseParameters(String[] args) {
> if (args.length > 0) {
> if (args.length == 3) {
> hostname = args[0];
> port = Integer.parseInt(args[1]);
> topic = args[2];
> } else {
> System.err.println("Usage: KafkaDataProcessor
> <hostname> <Port> <topic>");
> return false;
> }
> } else {
> System.out.println("Executing KafkaDataProcessor
> example with built-in default data.");
> System.out.println(" Provide Hostname and Port to read
> input data from.");
> System.out.println(" Usage: KafkaDataProcessor
> <Hostname> <Port> <topic>");
> return false;
> }
> return true;
> }
> }
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)