[ 
https://issues.apache.org/jira/browse/KAFKA-2686?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

The Data Lorax updated KAFKA-2686:
----------------------------------
    Description: 
The bellow code snippet demonstrated the problem.

Basically, the unsubscribe() call leaves the KafkaConsumer in a state that 
means poll() will always return empty record sets, even if new topic-partitions 
have been assigned that have messages pending.  This is because unsubscribe() 
sets SubscriptionState.needsPartitionAssignment to true, and assign() does not 
clear this flag. The only thing that clears this flag is when the consumer 
handles the response from a JoinGroup request.

{code}
final KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

consumer.assign(Collections.singletonList(new TopicPartition(topicName, 1)));
ConsumerRecords<String, String> records = consumer.poll(100);// <- Works, 
returning records

consumer.unsubscribe();   // Puts consumer into invalid state.

consumer.assign(Collections.singletonList(new TopicPartition(topicName, 2)));
records = consumer.poll(100);// <- Always returns empty record set.
{code}



> unsubscribe() call leaves KafkaConsumer in invalid state for manual 
> topic-partition assignment
> ----------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-2686
>                 URL: https://issues.apache.org/jira/browse/KAFKA-2686
>             Project: Kafka
>          Issue Type: Bug
>          Components: consumer
>    Affects Versions: 0.9.0.0
>            Reporter: The Data Lorax
>            Assignee: Neha Narkhede
>
> The bellow code snippet demonstrated the problem.
> Basically, the unsubscribe() call leaves the KafkaConsumer in a state that 
> means poll() will always return empty record sets, even if new 
> topic-partitions have been assigned that have messages pending.  This is 
> because unsubscribe() sets SubscriptionState.needsPartitionAssignment to 
> true, and assign() does not clear this flag. The only thing that clears this 
> flag is when the consumer handles the response from a JoinGroup request.
> {code}
> final KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
> consumer.assign(Collections.singletonList(new TopicPartition(topicName, 1)));
> ConsumerRecords<String, String> records = consumer.poll(100);// <- Works, 
> returning records
> consumer.unsubscribe();   // Puts consumer into invalid state.
> consumer.assign(Collections.singletonList(new TopicPartition(topicName, 2)));
> records = consumer.poll(100);// <- Always returns empty record set.
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to