Shixiong Zhu created KAFKA-4879:
-----------------------------------
Summary: KafkaConsumer.position may hang forever when deleting a
topic
Key: KAFKA-4879
URL: https://issues.apache.org/jira/browse/KAFKA-4879
Project: Kafka
Issue Type: Bug
Components: consumer
Affects Versions: 0.10.2.0
Reporter: Shixiong Zhu
KafkaConsumer.position may hang forever when deleting a topic. The problem is
this line
https://github.com/apache/kafka/blob/022bf129518e33e165f9ceefc4ab9e622952d3bd/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L374
The timeout is "Long.MAX_VALUE", and it will just retry forever for
UnknownTopicOrPartitionException.
Here is a reproducer
{code}
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.util.Collections;
import java.util.Properties;
import java.util.Set;
public class KafkaReproducer {
public static void main(String[] args) {
// Make sure "delete.topic.enable" is set to true.
// Please create the topic test with "3" partitions manually.
// The issue is gone when there is only one partition.
String topic = "test";
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "testgroup");
props.put("value.deserializer", StringDeserializer.class.getName());
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("enable.auto.commit", "false");
KafkaConsumer kc = new KafkaConsumer(props);
kc.subscribe(Collections.singletonList(topic));
kc.poll(0);
Set<TopicPartition> partitions = kc.assignment();
System.out.println("partitions: " + partitions);
kc.pause(partitions);
kc.seekToEnd(partitions);
System.out.println("please delete the topic in 30 seconds");
try {
// Sleep 30 seconds to give us enough time to delete the topic.
Thread.sleep(30000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("sleep end");
for (TopicPartition p : partitions) {
System.out.println(p + " offset: " + kc.position(p));
}
System.out.println("cannot reach here");
kc.close();
}
}
{code}
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)