[ 
https://issues.apache.org/jira/browse/KAFKA-5732?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16137222#comment-16137222
 ] 

Ramkumar commented on KAFKA-5732:
---------------------------------

I found if this points to old data log directory (which had Kafka 0.8 was 
using) then this issue happens. If I point to new directory , then it appears 
to work fine. Any pointer why it is not working when referring to kafka 0.8 
data log folders? how it needs to make compatible. I had set up  the message 
format and protocol as below
inter.broker.protocol.version=0.11.0
log.message.format.version=0.11.0

> Kafka 0.11 Consumer.Poll() hangs for consumer.subscribe() 
> ----------------------------------------------------------
>
>                 Key: KAFKA-5732
>                 URL: https://issues.apache.org/jira/browse/KAFKA-5732
>             Project: Kafka
>          Issue Type: Bug
>          Components: consumer
>    Affects Versions: 0.11.0.0
>         Environment: Linux
>            Reporter: Ramkumar
>         Attachments: dumptest5
>
>
> Hi,
> I am upgraded my 3 node kafka cluster from 0.8 to 0.11 broker. I am trying to 
> test the new consumer APIs.
> Below is the code extract. consumer.poll() method goes for a toss (thread 
> dump attached) for consumer.subscribe() method . This poll returns value if I 
> use consumer.seek() methods. Please let me know what i am doing incorrectly. 
> i have the advertised.host and listeners updated okay in server.properties. 
> Thread dump attached.
>           Properties props1 = new Properties();
>                         props1.put("bootstrap.servers", "localhost:9092");
>                         props1.put("group.id", "test3");
>                      props1.put("enable.auto.commit", "false");
>                       props1.put("auto_offset_reset", "earliest");
>                         props1.put("request.timeout.ms", 30000);
>                         props1.put("key.deserializer", 
> "org.apache.kafka.common.serialization.StringDeserializer");
>                         props1.put("value.deserializer", 
> "org.apache.kafka.common.serialization.StringDeserializer");
>                         String TestTopic="T3";
>                         KafkaConsumer<String, String> consumer1 = new 
> KafkaConsumer<>(props1);
>                         consumer1.subscribe(Arrays.asList(TestTopic));
>                         int j = 0;
>                         while (j < 10) {
>                                 j++;
>                                 ConsumerRecords<String, String> 
> records1=consumer1.poll(100);
>                               for (ConsumerRecord<String, String> record1 : 
> records1) {
>                                         System.out.printf("offset = %d, key = 
> %s, value = %s", record1.offset(), record1.key(),
>                                                         record1.value());
>                                         String t = record1.value();
>                                      out.write(t.getBytes());
>                                 }



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to