[ 
https://issues.apache.org/jira/browse/FLINK-2585?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14717388#comment-14717388
 ] 

Robert Metzger commented on FLINK-2585:
---------------------------------------

[~StephanEwen]: Kafka is only logging the exception (and catching Throwable in 
Scala)

{code}
 } catch {
        case t: Throwable => {
            if (!isRunning.get())
              throw t /* If this thread is stopped, propagate this exception to 
kill the thread. */
            else
              warn("Failed to find leader for %s".format(noLeaderPartitionSet), 
t)
          }
      } finally {
{code}

> KafkaSource not working
> -----------------------
>
>                 Key: FLINK-2585
>                 URL: https://issues.apache.org/jira/browse/FLINK-2585
>             Project: Flink
>          Issue Type: Bug
>            Reporter: Boyang Jerry Peng
>
> I tried running the KafkaConsumerExample with that is subscribing to a 
> console producer of kafka but the KafkaConsumerExample topology was not 
> receiving any data from Kafka.  Then I wrote my own topology that uses Kafka 
> as a source but it didn't work as well.  The topologies would run but receive 
> not data.  If I run a console consumer that subscibes to the topic of the 
> console producer, the console consumer receives data from the producer which 
> indicates the producer is working correctly.  Can someone help me with this 
> problem?  
> Kafka console producer I am running:
> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test 
> The flink code I am running:
> {code}
> public class KafkaDataProcessor {
>       private static int port;
>       private static String hostname;
>       private static String topic;
>       private static final Logger LOG = 
> LoggerFactory.getLogger(KafkaDataProcessor.class);
>       public static void main(String[] args) {
>               if (!parseParameters(args)) {
>                       return;
>               }
>               System.out.println("Start listening for data on: " + hostname + 
> ":" + port + " for topic: " + topic);
>               StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>               DataStream<Tuple2<String, Integer>> dataStream = env
>                                               .addSource(new 
> KafkaSource(hostname + ":" + port, topic, "test-consumer-group", new 
> SimpleStringSchema(), 200L), "Kafka source").setParallelism(2)
>                                                                               
> .flatMap(new Splitter()).setParallelism(2)
>                                                                               
> .groupBy(0)
>                                                                               
> .sum(1).setParallelism(2);
>               dataStream.print().setParallelism(2);
>               try {
>                       env.execute("kafka processor");
>               } catch (Exception e) {
>                       e.printStackTrace();
>               }
>       }
>       public static class Splitter implements FlatMapFunction<String, 
> Tuple2<String, Integer>> {
>               @Override
>               public void flatMap(String sentence, Collector<Tuple2<String, 
> Integer>> out) throws Exception {
>                       for (String word : sentence.split(" ")) {
>                               System.out.println("word: " + word);
>                               LOG.info("word: {}", word);
>                               out.collect(new Tuple2<String, Integer>(word, 
> 1));
>                       }
>               }
>       }
>       private static boolean parseParameters(String[] args) {
>               if (args.length > 0) {
>                       if (args.length == 3) {
>                               hostname = args[0];
>                               port = Integer.parseInt(args[1]);
>                               topic = args[2];
>                       } else {
>                               System.err.println("Usage: KafkaDataProcessor 
> <hostname> <Port> <topic>");
>                               return false;
>                       }
>               } else {
>                       System.out.println("Executing KafkaDataProcessor 
> example with built-in default data.");
>                       System.out.println("  Provide Hostname and Port to read 
> input data from.");
>                       System.out.println("  Usage: KafkaDataProcessor 
> <Hostname> <Port> <topic>");
>                       return false;
>               }
>               return true;
>       }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to