[ 
https://issues.apache.org/jira/browse/FLINK-2585?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14716865#comment-14716865
 ] 

Boyang Jerry Peng commented on FLINK-2585:
------------------------------------------

The pom I am using to compile

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0";
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
    <parent>
        <artifactId>yahoo-low-latency-bechmarks</artifactId>
        <groupId>org.apache.storm</groupId>
        <version>0.1.0</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>flink-benchmarks</artifactId>
    <packaging>jar</packaging>

    <properties>
        <log4j.configuration>log4j-test.properties</log4j.configuration>
        <slf4j.version>1.7.7</slf4j.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-core</artifactId>
            <version>0.9.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
            <version>0.9.0</version>
        </dependency>
        <dependency>
            <groupId>com.101tec</groupId>
            <artifactId>zkclient</artifactId>
            <version>0.5</version>
        </dependency>
        <dependency>
            <groupId>com.yammer.metrics</groupId>
            <artifactId>metrics-core</artifactId>
            <version>2.2.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka</artifactId>
            <version>0.9.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-lang3</artifactId>
            <version>3.3.2</version>
        </dependency>

        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>${slf4j.version}</version>
        </dependency>

        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>${slf4j.version}</version>
        </dependency>

        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>1.2.17</version>
        </dependency>

        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.11</version>
            <type>jar</type>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>org.mockito</groupId>
            <artifactId>mockito-all</artifactId>
            <version>1.9.5</version>
            <type>jar</type>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>org.powermock</groupId>
            <artifactId>powermock-module-junit4</artifactId>
            <version>1.5.5</version>
            <type>jar</type>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>org.powermock</groupId>
            <artifactId>powermock-api-mockito</artifactId>
            <version>1.5.5</version>
            <type>jar</type>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>org.hamcrest</groupId>
            <artifactId>hamcrest-all</artifactId>
            <version>1.3</version>
            <type>jar</type>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-dependency-plugin</artifactId>
                <version>2.9</version>
                <executions>
                    <execution>
                        <id>unpack</id>
                        <!-- executed just before the package phase -->
                        <phase>prepare-package</phase>
                        <goals>
                            <goal>unpack</goal>
                        </goals>
                        <configuration>
                            <artifactItems>
                                <!-- For Flink connector classes -->
                                <artifactItem>
                                    <groupId>org.apache.flink</groupId>
                                    
<artifactId>flink-connector-kafka</artifactId>
                                    <version>0.9.0</version>
                                    <type>jar</type>
                                    <overWrite>false</overWrite>
                                    
<outputDirectory>${project.build.directory}/classes</outputDirectory>
                                    <includes>org/apache/flink/**</includes>
                                </artifactItem>
                                <!-- For Kafka API classes -->
                                <artifactItem>
                                    <groupId>org.apache.kafka</groupId>
                                    <artifactId>kafka_2.10</artifactId>
                                    <version>0.8.2.1 </version>
                                    <type>jar</type>
                                    <overWrite>false</overWrite>
                                    
<outputDirectory>${project.build.directory}/classes</outputDirectory>
                                    <includes>kafka/**</includes>
                                </artifactItem>

                                <artifactItem>
                                    <groupId>com.101tec</groupId>
                                    <artifactId>zkclient</artifactId>
                                    <version>0.5</version>
                                    <type>jar</type>
                                    <overWrite>false</overWrite>
                                    
<outputDirectory>${project.build.directory}/classes</outputDirectory>
                                    <includes>**/*.class</includes>
                                </artifactItem>

                                <artifactItem>
                                    <groupId>com.yammer.metrics</groupId>
                                    <artifactId>metrics-core</artifactId>
                                    <version>2.2.0</version>
                                    <type>jar</type>
                                    <overWrite>false</overWrite>
                                    
<outputDirectory>${project.build.directory}/classes</outputDirectory>
                                    <includes>**/*.class</includes>
                                </artifactItem>
                            </artifactItems>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-jar-plugin</artifactId>
                <version>2.4</version><!--$NO-MVN-MAN-VER$-->
                <executions>

                    <!-- StreamingWordCount -->
                    <execution>
                        <id>WordCount</id>
                        <phase>package</phase>
                        <goals>
                            <goal>jar</goal>
                        </goals>
                        <configuration>
                            <classifier>StreamingWordCount</classifier>

                            <archive>
                                <manifestEntries>
                                    
<program-class>flink.benchmark.StreamingWordCount</program-class>
                                </manifestEntries>
                            </archive>

                            <includes>
                                
<include>flink/benchmark/StreamingWordCount.class</include>
                                
<include>flink/benchmark/StreamingWordCount*.class</include>
                            </includes>
                        </configuration>
                    </execution>
                    <!-- KafkaDataProcessor -->
                    <execution>
                        <id>KafkaDataProcessor</id>
                        <phase>package</phase>
                        <goals>
                            <goal>jar</goal>
                        </goals>
                        <configuration>
                            <classifier>KafkaDataProcessor</classifier>

                            <archive>
                                <manifestEntries>
                                    
<program-class>flink.benchmark.KafkaDataProcessor</program-class>
                                </manifestEntries>
                            </archive>

                            <includes>
                                
<include>flink/benchmark/KafkaDataProcessor.class</include>
                                
<include>flink/benchmark/KafkaDataProcessor*.class</include>
                                
<include>org/apache/flink/streaming/connectors/kafka/api/Kafka*.class</include>
                                
<include>kafka/javaapi/consumer/ConsumerConnector*.class</include>
                                <include>kafka/consumer/*.class</include>
                                <include>kafka/utils/*.class</include>
                                <include>kafka/*.class</include>
                                <include>kafka/common/*.class</include>
                                <include>kafka/**</include>
                                <include>org/I0Itec/zkclient/**</include>
                                <include>com/yammer/metrics/**</include>
                            </includes>
                        </configuration>
                    </execution>
                    <!-- KafkaConsumerExample -->
                    <execution>
                        <id>KafkaConsumerExample</id>
                        <phase>package</phase>
                        <goals>
                            <goal>jar</goal>
                        </goals>
                        <configuration>
                            <classifier>KafkaConsumerExample</classifier>

                            <archive>
                                <manifestEntries>
                                    
<program-class>flink.benchmark.KafkaConsumerExample</program-class>
                                </manifestEntries>
                            </archive>

                            <includes>
                                
<include>flink/benchmark/KafkaConsumerExample.class</include>
                                
<include>flink/benchmark/KafkaConsumerExample*.class</include>
                                
<include>org/apache/flink/streaming/connectors/kafka/api/Kafka*.class</include>
                                
<include>org/apache/flink/streaming/connectors/kafka/api/KafkaSource.class</include>
                                
<include>kafka/javaapi/consumer/ConsumerConnector*.class</include>
                                <include>kafka/consumer/*.class</include>
                                <include>kafka/utils/*.class</include>
                                <include>kafka/*.class</include>
                                <include>kafka/common/*.class</include>
                                <include>kafka/**</include>
                                <include>org/I0Itec/zkclient/**</include>
                                <include>com/yammer/metrics/**</include>
                            </includes>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

</project>

> KafkaSource not working
> -----------------------
>
>                 Key: FLINK-2585
>                 URL: https://issues.apache.org/jira/browse/FLINK-2585
>             Project: Flink
>          Issue Type: Bug
>            Reporter: Boyang Jerry Peng
>
> I tried running the KafkaConsumerExample with that is subscribing to a 
> console producer of kafka but the KafkaConsumerExample topology was not 
> receiving any data from Kafka.  Then I wrote my own topology that uses Kafka 
> as a source but it didn't work as well.  The topologies would run but receive 
> not data.  If I run a console consumer that subscibes to the topic of the 
> console producer, the console consumer receives data from the producer which 
> indicates the producer is working correctly.  Can someone help me with this 
> problem?  
> Kafka console producer I am running:
> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test 
> The flink code I am running:
> {code}
> public class KafkaDataProcessor {
>       private static int port;
>       private static String hostname;
>       private static String topic;
>       private static final Logger LOG = 
> LoggerFactory.getLogger(KafkaDataProcessor.class);
>       public static void main(String[] args) {
>               if (!parseParameters(args)) {
>                       return;
>               }
>               System.out.println("Start listening for data on: " + hostname + 
> ":" + port + " for topic: " + topic);
>               StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>               DataStream<Tuple2<String, Integer>> dataStream = env
>                                               .addSource(new 
> KafkaSource(hostname + ":" + port, topic, "test-consumer-group", new 
> SimpleStringSchema(), 200L), "Kafka source").setParallelism(2)
>                                                                               
> .flatMap(new Splitter()).setParallelism(2)
>                                                                               
> .groupBy(0)
>                                                                               
> .sum(1).setParallelism(2);
>               dataStream.print().setParallelism(2);
>               try {
>                       env.execute("kafka processor");
>               } catch (Exception e) {
>                       e.printStackTrace();
>               }
>       }
>       public static class Splitter implements FlatMapFunction<String, 
> Tuple2<String, Integer>> {
>               @Override
>               public void flatMap(String sentence, Collector<Tuple2<String, 
> Integer>> out) throws Exception {
>                       for (String word : sentence.split(" ")) {
>                               System.out.println("word: " + word);
>                               LOG.info("word: {}", word);
>                               out.collect(new Tuple2<String, Integer>(word, 
> 1));
>                       }
>               }
>       }
>       private static boolean parseParameters(String[] args) {
>               if (args.length > 0) {
>                       if (args.length == 3) {
>                               hostname = args[0];
>                               port = Integer.parseInt(args[1]);
>                               topic = args[2];
>                       } else {
>                               System.err.println("Usage: KafkaDataProcessor 
> <hostname> <Port> <topic>");
>                               return false;
>                       }
>               } else {
>                       System.out.println("Executing KafkaDataProcessor 
> example with built-in default data.");
>                       System.out.println("  Provide Hostname and Port to read 
> input data from.");
>                       System.out.println("  Usage: KafkaDataProcessor 
> <Hostname> <Port> <topic>");
>                       return false;
>               }
>               return true;
>       }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to