Ramkumar created KAFKA-5732: ------------------------------- Summary: Kafka 0.11 Consumer.Poll() hangs for consumer.subscribe() Key: KAFKA-5732 URL: https://issues.apache.org/jira/browse/KAFKA-5732 Project: Kafka Issue Type: Bug Components: consumer Affects Versions: 0.11.0.0 Environment: Linux Reporter: Ramkumar Attachments: dumptest5
Hi, I am upgraded my 3 node kafka cluster from 0.8 to 0.11 broker. I am trying to test the new consumer APIs. Below is the code extract. consumer.poll() method goes for a toss (thread dump attached) for consumer.subscribe() method . This poll returns value if I use consumer.seek() methods. Please let me know what i am doing incorrectly. i have the advertised.host and listeners updated okay in server.properties. Thread dump attached. Properties props1 = new Properties(); props1.put("bootstrap.servers", "localhost:9092"); props1.put("group.id", "test3"); props1.put("enable.auto.commit", "false"); props1.put("auto_offset_reset", "earliest"); props1.put("request.timeout.ms", 30000); props1.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props1.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); String TestTopic="T3"; KafkaConsumer<String, String> consumer1 = new KafkaConsumer<>(props1); consumer1.subscribe(Arrays.asList(TestTopic)); int j = 0; while (j < 10) { j++; ConsumerRecords<String, String> records1=consumer1.poll(100); for (ConsumerRecord<String, String> record1 : records1) { System.out.printf("offset = %d, key = %s, value = %s", record1.offset(), record1.key(), record1.value()); String t = record1.value(); out.write(t.getBytes()); } -- This message was sent by Atlassian JIRA (v6.4.14#64029)