[ 
https://issues.apache.org/jira/browse/KAFKA-693?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13553751#comment-13553751
 ] 

Maxime Brugidou commented on KAFKA-693:
---------------------------------------

Here is a patch:

1. AbstractFetchThread.addPartition(): call handleOffsetOutOfRange if 
initialOffset < 0

2. I didnt touch ConsumerFetcherManager.doWork() since addFetcher() is called 
for partitions with leaders only (which is why 3 is unnecessary).

3. ConsumerFetcherThrad.handleOffsetOutOfRange: check 
partitionErrorAndOffset.error and throw appropriate exception (which should 
have been done anyway, I don't think this is necessary for the patch)
3.1 Note: this should probably be done in the ReplicaFetcherThread too?

4. ZookeeperConsumerConnector.ZkRebalanceListener: Do not compute 
leaderIdForPartitionMap in rebalance() and set PartitionTopicInfo offsets to -1 
if not in Zk (new consumer)

5. PartitionTopicInfo: removed brokerId

6. Fixed tests for compilation (I am having a hard time running tests since 
./sbt test does not seem to work for me very well)

7. Should we increase the default refresh.leader.backoff.ms ? It's tradeoff 
between being able to pick fast a new leader to consume (useful when 
replication is on) and not flooding the broker when there is no leader (or 
replication is off). 200ms is very short, but something hybrid like "try 5 
times at 200ms backoff, then every 5min" would get all use cases.

I am running this on test clusters with a mirrormaker andthe error that I had 
in my initial test case (in the description) does not occur anymore.
                
> Consumer rebalance fails if no leader available for a partition and stops all 
> fetchers
> --------------------------------------------------------------------------------------
>
>                 Key: KAFKA-693
>                 URL: https://issues.apache.org/jira/browse/KAFKA-693
>             Project: Kafka
>          Issue Type: Bug
>          Components: core
>    Affects Versions: 0.8
>            Reporter: Maxime Brugidou
>         Attachments: KAFKA-693.patch, mirror_debug.log, mirror.log
>
>
> I am currently experiencing this with the MirrorMaker but I assume it happens 
> for any rebalance. The symptoms are:
> I have replication factor of 1
> 1. If i start the MirrorMaker (bin/kafka-run-class.sh kafka.tools.MirrorMaker 
> --consumer.config mirror-consumer.properties  --producer.config 
> mirror-producer.properties --blacklist 'xdummyx' --num.streams=1 
> --num.producers=1) with a broker down
> 1.1 I set the refresh.leader.backoff.ms to 600000 (10min) so that the 
> ConsumerFetcherManager doesn't retry to often to get the unavailable 
> partitions
> 1.2 The rebalance starts at the init step and fails: Exception in thread 
> "main" kafka.common.ConsumerRebalanceFailedException: 
> KafkaMirror_mirror-01-1357893495345-fac86b15 can't rebalance after 4 retries
> 1.3 After the exception, everything stops (fetchers and queues)
> 1.4 I attached the full logs (info & debug) for this case
> 2. If i start the MirrorMaker with all the brokers up and then kill a broker
> 2.1 The first rebalance is successful
> 2.2 The consumer will handle correctly the broker down and stop the 
> associated ConsumerFetcherThread
> 2.3 The refresh.leader.backoff.ms to 600000 works correctly
> 2.4 If something triggers a rebalance (new topic, partition reassignment...), 
> then we go back to 1., the rebalance fails and stops everything.
> I think the desired behavior is to consumer whatever is available, and try 
> later at some intervals. I would be glad to help on that issue although the 
> Consumer code seems a little tough to get on.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

Reply via email to