[ 
https://issues.apache.org/jira/browse/KAFKA-4879?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16301018#comment-16301018
 ] 

ASF GitHub Bot commented on KAFKA-4879:
---------------------------------------

hachikuji commented on issue #3637: KAFKA-4879 KafkaConsumer.position may hang 
forever when deleting a topic
URL: https://github.com/apache/kafka/pull/3637#issuecomment-353532046
 
 
   Closing this since I'm taking over the issue.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> KafkaConsumer.position may hang forever when deleting a topic
> -------------------------------------------------------------
>
>                 Key: KAFKA-4879
>                 URL: https://issues.apache.org/jira/browse/KAFKA-4879
>             Project: Kafka
>          Issue Type: Bug
>          Components: consumer
>    Affects Versions: 0.10.2.0
>            Reporter: Shixiong Zhu
>            Assignee: Jason Gustafson
>             Fix For: 1.1.0
>
>
> KafkaConsumer.position may hang forever when deleting a topic. The problem is 
> this line 
> https://github.com/apache/kafka/blob/022bf129518e33e165f9ceefc4ab9e622952d3bd/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L374
> The timeout is "Long.MAX_VALUE", and it will just retry forever for 
> UnknownTopicOrPartitionException.
> Here is a reproducer
> {code}
> import org.apache.kafka.clients.consumer.KafkaConsumer;
> import org.apache.kafka.common.TopicPartition;
> import org.apache.kafka.common.serialization.StringDeserializer;
> import java.util.Collections;
> import java.util.Properties;
> import java.util.Set;
> public class KafkaReproducer {
>   public static void main(String[] args) {
>     // Make sure "delete.topic.enable" is set to true.
>     // Please create the topic test with "3" partitions manually.
>     // The issue is gone when there is only one partition.
>     String topic = "test";
>     Properties props = new Properties();
>     props.put("bootstrap.servers", "localhost:9092");
>     props.put("group.id", "testgroup");
>     props.put("value.deserializer", StringDeserializer.class.getName());
>     props.put("key.deserializer", StringDeserializer.class.getName());
>     props.put("enable.auto.commit", "false");
>     KafkaConsumer kc = new KafkaConsumer(props);
>     kc.subscribe(Collections.singletonList(topic));
>     kc.poll(0);
>     Set<TopicPartition> partitions = kc.assignment();
>     System.out.println("partitions: " + partitions);
>     kc.pause(partitions);
>     kc.seekToEnd(partitions);
>     System.out.println("please delete the topic in 30 seconds");
>     try {
>       // Sleep 30 seconds to give us enough time to delete the topic.
>       Thread.sleep(30000);
>     } catch (InterruptedException e) {
>       e.printStackTrace();
>     }
>     System.out.println("sleep end");
>     for (TopicPartition p : partitions) {
>       System.out.println(p + " offset: " + kc.position(p));
>     }
>     System.out.println("cannot reach here");
>     kc.close();
>   }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to