[
https://issues.apache.org/jira/browse/FLINK-2585?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14716865#comment-14716865
]
Boyang Jerry Peng commented on FLINK-2585:
------------------------------------------
The pom I am using to compile
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>yahoo-low-latency-bechmarks</artifactId>
<groupId>org.apache.storm</groupId>
<version>0.1.0</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>flink-benchmarks</artifactId>
<packaging>jar</packaging>
<properties>
<log4j.configuration>log4j-test.properties</log4j.configuration>
<slf4j.version>1.7.7</slf4j.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-core</artifactId>
<version>0.9.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>0.9.0</version>
</dependency>
<dependency>
<groupId>com.101tec</groupId>
<artifactId>zkclient</artifactId>
<version>0.5</version>
</dependency>
<dependency>
<groupId>com.yammer.metrics</groupId>
<artifactId>metrics-core</artifactId>
<version>2.2.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>0.9.0</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.3.2</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>${slf4j.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>${slf4j.version}</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<type>jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-all</artifactId>
<version>1.9.5</version>
<type>jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.powermock</groupId>
<artifactId>powermock-module-junit4</artifactId>
<version>1.5.5</version>
<type>jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.powermock</groupId>
<artifactId>powermock-api-mockito</artifactId>
<version>1.5.5</version>
<type>jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest-all</artifactId>
<version>1.3</version>
<type>jar</type>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
<version>2.9</version>
<executions>
<execution>
<id>unpack</id>
<!-- executed just before the package phase -->
<phase>prepare-package</phase>
<goals>
<goal>unpack</goal>
</goals>
<configuration>
<artifactItems>
<!-- For Flink connector classes -->
<artifactItem>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>0.9.0</version>
<type>jar</type>
<overWrite>false</overWrite>
<outputDirectory>${project.build.directory}/classes</outputDirectory>
<includes>org/apache/flink/**</includes>
</artifactItem>
<!-- For Kafka API classes -->
<artifactItem>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
<version>0.8.2.1 </version>
<type>jar</type>
<overWrite>false</overWrite>
<outputDirectory>${project.build.directory}/classes</outputDirectory>
<includes>kafka/**</includes>
</artifactItem>
<artifactItem>
<groupId>com.101tec</groupId>
<artifactId>zkclient</artifactId>
<version>0.5</version>
<type>jar</type>
<overWrite>false</overWrite>
<outputDirectory>${project.build.directory}/classes</outputDirectory>
<includes>**/*.class</includes>
</artifactItem>
<artifactItem>
<groupId>com.yammer.metrics</groupId>
<artifactId>metrics-core</artifactId>
<version>2.2.0</version>
<type>jar</type>
<overWrite>false</overWrite>
<outputDirectory>${project.build.directory}/classes</outputDirectory>
<includes>**/*.class</includes>
</artifactItem>
</artifactItems>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<version>2.4</version><!--$NO-MVN-MAN-VER$-->
<executions>
<!-- StreamingWordCount -->
<execution>
<id>WordCount</id>
<phase>package</phase>
<goals>
<goal>jar</goal>
</goals>
<configuration>
<classifier>StreamingWordCount</classifier>
<archive>
<manifestEntries>
<program-class>flink.benchmark.StreamingWordCount</program-class>
</manifestEntries>
</archive>
<includes>
<include>flink/benchmark/StreamingWordCount.class</include>
<include>flink/benchmark/StreamingWordCount*.class</include>
</includes>
</configuration>
</execution>
<!-- KafkaDataProcessor -->
<execution>
<id>KafkaDataProcessor</id>
<phase>package</phase>
<goals>
<goal>jar</goal>
</goals>
<configuration>
<classifier>KafkaDataProcessor</classifier>
<archive>
<manifestEntries>
<program-class>flink.benchmark.KafkaDataProcessor</program-class>
</manifestEntries>
</archive>
<includes>
<include>flink/benchmark/KafkaDataProcessor.class</include>
<include>flink/benchmark/KafkaDataProcessor*.class</include>
<include>org/apache/flink/streaming/connectors/kafka/api/Kafka*.class</include>
<include>kafka/javaapi/consumer/ConsumerConnector*.class</include>
<include>kafka/consumer/*.class</include>
<include>kafka/utils/*.class</include>
<include>kafka/*.class</include>
<include>kafka/common/*.class</include>
<include>kafka/**</include>
<include>org/I0Itec/zkclient/**</include>
<include>com/yammer/metrics/**</include>
</includes>
</configuration>
</execution>
<!-- KafkaConsumerExample -->
<execution>
<id>KafkaConsumerExample</id>
<phase>package</phase>
<goals>
<goal>jar</goal>
</goals>
<configuration>
<classifier>KafkaConsumerExample</classifier>
<archive>
<manifestEntries>
<program-class>flink.benchmark.KafkaConsumerExample</program-class>
</manifestEntries>
</archive>
<includes>
<include>flink/benchmark/KafkaConsumerExample.class</include>
<include>flink/benchmark/KafkaConsumerExample*.class</include>
<include>org/apache/flink/streaming/connectors/kafka/api/Kafka*.class</include>
<include>org/apache/flink/streaming/connectors/kafka/api/KafkaSource.class</include>
<include>kafka/javaapi/consumer/ConsumerConnector*.class</include>
<include>kafka/consumer/*.class</include>
<include>kafka/utils/*.class</include>
<include>kafka/*.class</include>
<include>kafka/common/*.class</include>
<include>kafka/**</include>
<include>org/I0Itec/zkclient/**</include>
<include>com/yammer/metrics/**</include>
</includes>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
> KafkaSource not working
> -----------------------
>
> Key: FLINK-2585
> URL: https://issues.apache.org/jira/browse/FLINK-2585
> Project: Flink
> Issue Type: Bug
> Reporter: Boyang Jerry Peng
>
> I tried running the KafkaConsumerExample with that is subscribing to a
> console producer of kafka but the KafkaConsumerExample topology was not
> receiving any data from Kafka. Then I wrote my own topology that uses Kafka
> as a source but it didn't work as well. The topologies would run but receive
> not data. If I run a console consumer that subscibes to the topic of the
> console producer, the console consumer receives data from the producer which
> indicates the producer is working correctly. Can someone help me with this
> problem?
> Kafka console producer I am running:
> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
> The flink code I am running:
> {code}
> public class KafkaDataProcessor {
> private static int port;
> private static String hostname;
> private static String topic;
> private static final Logger LOG =
> LoggerFactory.getLogger(KafkaDataProcessor.class);
> public static void main(String[] args) {
> if (!parseParameters(args)) {
> return;
> }
> System.out.println("Start listening for data on: " + hostname +
> ":" + port + " for topic: " + topic);
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> DataStream<Tuple2<String, Integer>> dataStream = env
> .addSource(new
> KafkaSource(hostname + ":" + port, topic, "test-consumer-group", new
> SimpleStringSchema(), 200L), "Kafka source").setParallelism(2)
>
> .flatMap(new Splitter()).setParallelism(2)
>
> .groupBy(0)
>
> .sum(1).setParallelism(2);
> dataStream.print().setParallelism(2);
> try {
> env.execute("kafka processor");
> } catch (Exception e) {
> e.printStackTrace();
> }
> }
> public static class Splitter implements FlatMapFunction<String,
> Tuple2<String, Integer>> {
> @Override
> public void flatMap(String sentence, Collector<Tuple2<String,
> Integer>> out) throws Exception {
> for (String word : sentence.split(" ")) {
> System.out.println("word: " + word);
> LOG.info("word: {}", word);
> out.collect(new Tuple2<String, Integer>(word,
> 1));
> }
> }
> }
> private static boolean parseParameters(String[] args) {
> if (args.length > 0) {
> if (args.length == 3) {
> hostname = args[0];
> port = Integer.parseInt(args[1]);
> topic = args[2];
> } else {
> System.err.println("Usage: KafkaDataProcessor
> <hostname> <Port> <topic>");
> return false;
> }
> } else {
> System.out.println("Executing KafkaDataProcessor
> example with built-in default data.");
> System.out.println(" Provide Hostname and Port to read
> input data from.");
> System.out.println(" Usage: KafkaDataProcessor
> <Hostname> <Port> <topic>");
> return false;
> }
> return true;
> }
> }
> {code}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)