Richard Yu created KAFKA-8438:
---------------------------------

             Summary: Add API to allow user to define end behavior of consumer 
failure
                 Key: KAFKA-8438
                 URL: https://issues.apache.org/jira/browse/KAFKA-8438
             Project: Kafka
          Issue Type: New Feature
          Components: consumer
            Reporter: Richard Yu


Recently, in a concerted effort to make Kafka's rebalances less painful, 
various approaches has been used to reduce the number of and impact of 
rebalances. Often, the trigger of a rebalance is a failure of some sort, in 
which case, the workload will be redistributed among surviving threads. Working 
to reduce rebalances due to random consumer crashes, a recent change to Kafka 
internals had been made (which introduces the concept of static membership) 
that prevents a rebalance from occurring within {{session.timeout.ms}} in the 
hope that the consumer thread which crashed would recover in that time interval.

However, in some cases, some consumer threads would permanently go down or 
remain dead for long periods of time. In these scenarios, users of Kafka would 
possibly not be aware of such a crash until hours later after it happened which 
forces Kafka users to manually start a new KafkaConsumer process a considerable 
period of time after the failure had occurred. That is where the addition of a 
callback such as {{onConsumerFailure}} would help. There are multiple use cases 
for this callback (which is defined by the user). {{onConsumerFailure}} is 
called when a particular consumer thread goes under for some specified time 
interval (i.e. a config called {{acceeptable.consumer.failure.timeout.ms}}). 
When called, this method could be used to log a consumer failure or should the 
user wish it, create a new thread which would then rejoin the consumer group 
(which could also include the required {{group.instance.id}} so that a 
rebalance wouldn't be re-triggered). 

Should the old thread recover and attempt to rejoin the consumer group (with 
the substitute thread being part of the group), the old thread will be denied 
access and an exception would be thrown (to indicate that another process has 
already taken its place).

 

 

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to