On 9/8/15 6:58 PM, Jason Gustafson wrote: > Hey Phil, > > You've stumbled onto one of the tricky aspects of the new consumer that > we've been talking about recently. KafkaConsumer.subscribe() is > asynchronous in the sense that it will return before partitions have been > assigned. We could make it synchronous, but we wouldn't be able to > guarantee how long the assignment would be active since other members of > the group or metadata changes can cause the coordinator to rebalance the > assignment. The best place to perform a seek would probably be in the > rebalance callback, which can be passed through the alternative subscribe > API. The code might look something like this: > > consumer.subscribe(topics, new RebalanceListener() { > void onPartitionsAssigned(List<TopicPartition> partitions) { > // seek to the initial offset for the assigned partitions here > } > void onPartitionsRevoked(List<TopicPartition> partitions) { > // commit offsets if you need to > } > }); > > while (true) { > ConsumerRecords records = consumer.poll(100); > // do stuff with records > } > > Does that make sense?
Yes, this makes sense. Thanks! Phil > > > Thanks, > Jason > > > On Tue, Sep 8, 2015 at 2:59 PM, Phil Steitz <phil.ste...@gmail.com> wrote: > >> I have been experimenting with the KafkaConsumer currently in >> development [1]. Sorry if this should be a question for the user >> list, but I am not sure if what I am seeing is something not working >> yet or if I am misunderstanding the API. If I use >> KafkaConsumer#subscribe to subscribe to a topic and then try to use >> seek(TopicPartion, offset) to position the consumer, I get an >> IllegalStateException with message "No current assignment for >> partition ...." If I use assign instead to connect to the topic, >> things work fine. I can see why this is by looking at the >> SubscriptionState code which is throwing the ISE because >> SubscriptionState#seek expects to find an assignment, but >> KafkaConsumer#subscribe does not make any. >> >> I know this is unreleased code and I am not looking for help here - >> actually more like looking *to* help but just learning the code. >> Happy to open a ticket with a test case if that will help or a patch >> to the javadoc if I am misunderstanding the API and it can be made >> clearer. >> >> Thanks! >> >> Phil >> >> [1] ff189fa05ccdacac100f3d15d167dcbe561f57a6 >> >>