[ https://issues.apache.org/jira/browse/KAFKA-2686?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
The Data Lorax updated KAFKA-2686: ---------------------------------- Description: The bellow code snippet demonstrated the problem. Basically, the unsubscribe() call leaves the KafkaConsumer in a state that means poll() will always return empty record sets, even if new topic-partitions have been assigned that have messages pending. This is because unsubscribe() sets SubscriptionState.needsPartitionAssignment to true, and assign() does not clear this flag. The only thing that clears this flag is when the consumer handles the response from a JoinGroup request. {code} final KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.assign(Collections.singletonList(new TopicPartition(topicName, 1))); ConsumerRecords<String, String> records = consumer.poll(100);// <- Works, returning records consumer.unsubscribe(); // Puts consumer into invalid state. consumer.assign(Collections.singletonList(new TopicPartition(topicName, 2))); records = consumer.poll(100);// <- Always returns empty record set. {code} > unsubscribe() call leaves KafkaConsumer in invalid state for manual > topic-partition assignment > ---------------------------------------------------------------------------------------------- > > Key: KAFKA-2686 > URL: https://issues.apache.org/jira/browse/KAFKA-2686 > Project: Kafka > Issue Type: Bug > Components: consumer > Affects Versions: 0.9.0.0 > Reporter: The Data Lorax > Assignee: Neha Narkhede > > The bellow code snippet demonstrated the problem. > Basically, the unsubscribe() call leaves the KafkaConsumer in a state that > means poll() will always return empty record sets, even if new > topic-partitions have been assigned that have messages pending. This is > because unsubscribe() sets SubscriptionState.needsPartitionAssignment to > true, and assign() does not clear this flag. The only thing that clears this > flag is when the consumer handles the response from a JoinGroup request. > {code} > final KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); > consumer.assign(Collections.singletonList(new TopicPartition(topicName, 1))); > ConsumerRecords<String, String> records = consumer.poll(100);// <- Works, > returning records > consumer.unsubscribe(); // Puts consumer into invalid state. > consumer.assign(Collections.singletonList(new TopicPartition(topicName, 2))); > records = consumer.poll(100);// <- Always returns empty record set. > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)