Ramkumar created KAFKA-5732:
-------------------------------
Summary: Kafka 0.11 Consumer.Poll() hangs for consumer.subscribe()
Key: KAFKA-5732
URL: https://issues.apache.org/jira/browse/KAFKA-5732
Project: Kafka
Issue Type: Bug
Components: consumer
Affects Versions: 0.11.0.0
Environment: Linux
Reporter: Ramkumar
Attachments: dumptest5
Hi,
I am upgraded my 3 node kafka cluster from 0.8 to 0.11 broker. I am trying to
test the new consumer APIs.
Below is the code extract. consumer.poll() method goes for a toss (thread dump
attached) for consumer.subscribe() method . This poll returns value if I use
consumer.seek() methods. Please let me know what i am doing incorrectly. i have
the advertised.host and listeners updated okay in server.properties. Thread
dump attached.
Properties props1 = new Properties();
props1.put("bootstrap.servers", "localhost:9092");
props1.put("group.id", "test3");
props1.put("enable.auto.commit", "false");
props1.put("auto_offset_reset", "earliest");
props1.put("request.timeout.ms", 30000);
props1.put("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
props1.put("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
String TestTopic="T3";
KafkaConsumer<String, String> consumer1 = new
KafkaConsumer<>(props1);
consumer1.subscribe(Arrays.asList(TestTopic));
int j = 0;
while (j < 10) {
j++;
ConsumerRecords<String, String>
records1=consumer1.poll(100);
for (ConsumerRecord<String, String> record1 :
records1) {
System.out.printf("offset = %d, key =
%s, value = %s", record1.offset(), record1.key(),
record1.value());
String t = record1.value();
out.write(t.getBytes());
}
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)