Shixiong Zhu created KAFKA-4879: ----------------------------------- Summary: KafkaConsumer.position may hang forever when deleting a topic Key: KAFKA-4879 URL: https://issues.apache.org/jira/browse/KAFKA-4879 Project: Kafka Issue Type: Bug Components: consumer Affects Versions: 0.10.2.0 Reporter: Shixiong Zhu
KafkaConsumer.position may hang forever when deleting a topic. The problem is this line https://github.com/apache/kafka/blob/022bf129518e33e165f9ceefc4ab9e622952d3bd/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L374 The timeout is "Long.MAX_VALUE", and it will just retry forever for UnknownTopicOrPartitionException. Here is a reproducer {code} import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.StringDeserializer; import java.util.Collections; import java.util.Properties; import java.util.Set; public class KafkaReproducer { public static void main(String[] args) { // Make sure "delete.topic.enable" is set to true. // Please create the topic test with "3" partitions manually. // The issue is gone when there is only one partition. String topic = "test"; Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", "testgroup"); props.put("value.deserializer", StringDeserializer.class.getName()); props.put("key.deserializer", StringDeserializer.class.getName()); props.put("enable.auto.commit", "false"); KafkaConsumer kc = new KafkaConsumer(props); kc.subscribe(Collections.singletonList(topic)); kc.poll(0); Set<TopicPartition> partitions = kc.assignment(); System.out.println("partitions: " + partitions); kc.pause(partitions); kc.seekToEnd(partitions); System.out.println("please delete the topic in 30 seconds"); try { // Sleep 30 seconds to give us enough time to delete the topic. Thread.sleep(30000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("sleep end"); for (TopicPartition p : partitions) { System.out.println(p + " offset: " + kc.position(p)); } System.out.println("cannot reach here"); kc.close(); } } {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)