[ 
https://issues.apache.org/jira/browse/KAFKA-693?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13552854#comment-13552854
 ] 

Jun Rao commented on KAFKA-693:
-------------------------------

That's a good point. I overlooked this. Your understanding is correct. We could 
move the offset initialization logic into AbstractFetcherThread. The following 
is one way to do this. Not sure if this is the best way.

1. In AbstractFetcher: 
Change addPartition to pass in initialOffset: Option[Long].
If initialOffset is none, we call handleOffsetOutOfRange to get the offset. If 
we hit any exception while doing this, we pass the exception to the caller 
without adding the partition to partitionMap.

2. In ConsumerFetcherManager.doWork():
If we hit any exception when calling addFetcher, we add the partition back to 
noLeaderPartitionSet.

3. In ConsumerFetcherThread.handleOffsetOutOfRange():
We need to check if the offset response has any error. If so, we throw an 
exception to the caller.

4. In ZookeeperConsumerConnector.addPartitionTopicInfo(): If initial offset 
doesn't exist in ZK, we pass in none to PartitionTopicInfo.

5. In PartitionTopicInfo: Make fetchedOffset Option[AtomicLong].

                
> Consumer rebalance fails if no leader available for a partition and stops all 
> fetchers
> --------------------------------------------------------------------------------------
>
>                 Key: KAFKA-693
>                 URL: https://issues.apache.org/jira/browse/KAFKA-693
>             Project: Kafka
>          Issue Type: Bug
>          Components: core
>    Affects Versions: 0.8
>            Reporter: Maxime Brugidou
>         Attachments: mirror_debug.log, mirror.log
>
>
> I am currently experiencing this with the MirrorMaker but I assume it happens 
> for any rebalance. The symptoms are:
> I have replication factor of 1
> 1. If i start the MirrorMaker (bin/kafka-run-class.sh kafka.tools.MirrorMaker 
> --consumer.config mirror-consumer.properties  --producer.config 
> mirror-producer.properties --blacklist 'xdummyx' --num.streams=1 
> --num.producers=1) with a broker down
> 1.1 I set the refresh.leader.backoff.ms to 600000 (10min) so that the 
> ConsumerFetcherManager doesn't retry to often to get the unavailable 
> partitions
> 1.2 The rebalance starts at the init step and fails: Exception in thread 
> "main" kafka.common.ConsumerRebalanceFailedException: 
> KafkaMirror_mirror-01-1357893495345-fac86b15 can't rebalance after 4 retries
> 1.3 After the exception, everything stops (fetchers and queues)
> 1.4 I attached the full logs (info & debug) for this case
> 2. If i start the MirrorMaker with all the brokers up and then kill a broker
> 2.1 The first rebalance is successful
> 2.2 The consumer will handle correctly the broker down and stop the 
> associated ConsumerFetcherThread
> 2.3 The refresh.leader.backoff.ms to 600000 works correctly
> 2.4 If something triggers a rebalance (new topic, partition reassignment...), 
> then we go back to 1., the rebalance fails and stops everything.
> I think the desired behavior is to consumer whatever is available, and try 
> later at some intervals. I would be glad to help on that issue although the 
> Consumer code seems a little tough to get on.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

Reply via email to