[ 
https://issues.apache.org/jira/browse/KAFKA-2674?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14966003#comment-14966003
 ] 

Jiangjie Qin commented on KAFKA-2674:
-------------------------------------

[~guozhang] The original reason we add consumer rebalance listener was because 
rebalance can be triggered without user awareness. But when rebalance occurs, 
user might want to do something. This is different from consumer closure case. 
When users close the consumer, they know what they are doing, and likely they 
will do some cleanup and necessary state checkpoint before closing the 
consumer. I am not sure how valuable it is to put some pre-closure tasks to 
onPartitionRevoked(). To me it is a little confusing. Also user might need to 
add some check to see if the rebalance is caused by close() or it is an actual 
rebalance.(for example the grabbing a lock as I mentioned before).

Good point about the Mirror Maker. The reason we have global assignment for 
mirror maker during rebalance listener is because we want to allow mirror maker 
to do some administrative logic when rebalance is triggered. (e.g. when 
rebalance is triggered because of a new topic is created in source cluster, we 
want to create the topic in target cluster with the same number of partitions). 
In order to let the rebalance listener to perform such action we need a global 
knowledge of topic so we know which topics changed. This global topic 
information is part of global assignment (global topic info + owners).

With the new client-side assignment patch, at least the leader has global 
assignment knowledge, so I think for that use case we should be fine. Although 
letting the leader to do everything is not ideal if the administrative work is 
heavy, but it is still doable.



> ConsumerRebalanceListener.onPartitionsRevoked() is not called on consumer 
> close
> -------------------------------------------------------------------------------
>
>                 Key: KAFKA-2674
>                 URL: https://issues.apache.org/jira/browse/KAFKA-2674
>             Project: Kafka
>          Issue Type: Bug
>          Components: consumer
>    Affects Versions: 0.9.0.0
>            Reporter: Michal Turek
>            Assignee: Neha Narkhede
>
> Hi, I'm investigating and testing behavior of new consumer from the planned 
> release 0.9 and found an inconsistency in calling of rebalance callbacks.
> I noticed that ConsumerRebalanceListener.onPartitionsRevoked() is NOT called 
> during consumer close and application shutdown. It's JavaDoc contract says:
> - "This method will be called before a rebalance operation starts and after 
> the consumer stops fetching data."
> - "It is recommended that offsets should be committed in this callback to 
> either Kafka or a custom offset store to prevent duplicate data."
> I believe calling consumer.close() is a start of rebalance operation and even 
> the local consumer that is actually closing should be notified to be able to 
> process any rebalance logic including offsets commit (e.g. if auto-commit is 
> disabled).
> There are commented logs of current and expected behaviors.
> {noformat}
> // Application start
> 2015-10-20 15:14:02.208 INFO  o.a.k.common.utils.AppInfoParser    
> [TestConsumer-worker-0]: Kafka version : 0.9.0.0-SNAPSHOT 
> (AppInfoParser.java:82)
> 2015-10-20 15:14:02.208 INFO  o.a.k.common.utils.AppInfoParser    
> [TestConsumer-worker-0]: Kafka commitId : 241b9ab58dcbde0c 
> (AppInfoParser.java:83)
> // Consumer started (the first one in group), rebalance callbacks are called 
> including empty onPartitionsRevoked()
> 2015-10-20 15:14:02.333 INFO  c.a.e.kafka.newapi.TestConsumer     
> [TestConsumer-worker-0]: Rebalance callback, revoked: [] 
> (TestConsumer.java:95)
> 2015-10-20 15:14:02.343 INFO  c.a.e.kafka.newapi.TestConsumer     
> [TestConsumer-worker-0]: Rebalance callback, assigned: [testB-1, testA-0, 
> testB-0, testB-3, testA-2, testB-2, testA-1, testA-4, testB-4, testA-3] 
> (TestConsumer.java:100)
> // Another consumer joined the group, rebalancing
> 2015-10-20 15:14:17.345 INFO  o.a.k.c.c.internals.Coordinator     
> [TestConsumer-worker-0]: Attempt to heart beat failed since the group is 
> rebalancing, try to re-join group. (Coordinator.java:714)
> 2015-10-20 15:14:17.346 INFO  c.a.e.kafka.newapi.TestConsumer     
> [TestConsumer-worker-0]: Rebalance callback, revoked: [testB-1, testA-0, 
> testB-0, testB-3, testA-2, testB-2, testA-1, testA-4, testB-4, testA-3] 
> (TestConsumer.java:95)
> 2015-10-20 15:14:17.349 INFO  c.a.e.kafka.newapi.TestConsumer     
> [TestConsumer-worker-0]: Rebalance callback, assigned: [testB-3, testA-4, 
> testB-4, testA-3] (TestConsumer.java:100)
> // Consumer started closing, there SHOULD be onPartitionsRevoked() callback 
> to commit offsets like during standard rebalance, but it is missing
> 2015-10-20 15:14:39.280 INFO  c.a.e.kafka.newapi.TestConsumer     [main]: 
> Closing instance (TestConsumer.java:42)
> 2015-10-20 15:14:40.264 INFO  c.a.e.kafka.newapi.TestConsumer     
> [TestConsumer-worker-0]: Worker thread stopped (TestConsumer.java:89)
> {noformat}
> Workaround is to call onPartitionsRevoked() explicitly and manually just 
> before calling consumer.close() but it seems dirty and error prone for me. It 
> can be simply forgotten be someone without such experience.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to