[ https://issues.apache.org/jira/browse/KAFKA-5732?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16137222#comment-16137222 ]
Ramkumar commented on KAFKA-5732: --------------------------------- I found if this points to old data log directory (which had Kafka 0.8 was using) then this issue happens. If I point to new directory , then it appears to work fine. Any pointer why it is not working when referring to kafka 0.8 data log folders? how it needs to make compatible. I had set up the message format and protocol as below inter.broker.protocol.version=0.11.0 log.message.format.version=0.11.0 > Kafka 0.11 Consumer.Poll() hangs for consumer.subscribe() > ---------------------------------------------------------- > > Key: KAFKA-5732 > URL: https://issues.apache.org/jira/browse/KAFKA-5732 > Project: Kafka > Issue Type: Bug > Components: consumer > Affects Versions: 0.11.0.0 > Environment: Linux > Reporter: Ramkumar > Attachments: dumptest5 > > > Hi, > I am upgraded my 3 node kafka cluster from 0.8 to 0.11 broker. I am trying to > test the new consumer APIs. > Below is the code extract. consumer.poll() method goes for a toss (thread > dump attached) for consumer.subscribe() method . This poll returns value if I > use consumer.seek() methods. Please let me know what i am doing incorrectly. > i have the advertised.host and listeners updated okay in server.properties. > Thread dump attached. > Properties props1 = new Properties(); > props1.put("bootstrap.servers", "localhost:9092"); > props1.put("group.id", "test3"); > props1.put("enable.auto.commit", "false"); > props1.put("auto_offset_reset", "earliest"); > props1.put("request.timeout.ms", 30000); > props1.put("key.deserializer", > "org.apache.kafka.common.serialization.StringDeserializer"); > props1.put("value.deserializer", > "org.apache.kafka.common.serialization.StringDeserializer"); > String TestTopic="T3"; > KafkaConsumer<String, String> consumer1 = new > KafkaConsumer<>(props1); > consumer1.subscribe(Arrays.asList(TestTopic)); > int j = 0; > while (j < 10) { > j++; > ConsumerRecords<String, String> > records1=consumer1.poll(100); > for (ConsumerRecord<String, String> record1 : > records1) { > System.out.printf("offset = %d, key = > %s, value = %s", record1.offset(), record1.key(), > record1.value()); > String t = record1.value(); > out.write(t.getBytes()); > } -- This message was sent by Atlassian JIRA (v6.4.14#64029)