Boyang Chen created KAFKA-7018:
----------------------------------

             Summary: persist memberId for consumer restart
                 Key: KAFKA-7018
                 URL: https://issues.apache.org/jira/browse/KAFKA-7018
             Project: Kafka
          Issue Type: Improvement
          Components: consumer, streams
            Reporter: Boyang Chen
            Assignee: Boyang Chen


In group coordinator, there is a logic to neglect join group request from 
existing follower consumers:
{code:java}
case Empty | Stable =>
  if (memberId == JoinGroupRequest.UNKNOWN_MEMBER_ID) {
    // if the member id is unknown, register the member to the group
    addMemberAndRebalance(rebalanceTimeoutMs, sessionTimeoutMs, clientId, 
clientHost, protocolType, protocols, group, responseCallback)
  } else {
    val member = group.get(memberId)
    if (group.isLeader(memberId) || !member.matches(protocols)) {
      // force a rebalance if a member has changed metadata or if the leader 
sends JoinGroup.
      // The latter allows the leader to trigger rebalances for changes 
affecting assignment
      // which do not affect the member metadata (such as topic metadata 
changes for the consumer)
      updateMemberAndRebalance(group, member, protocols, responseCallback)
    } else {
      // for followers with no actual change to their metadata, just return 
group information
      // for the current generation which will allow them to issue SyncGroup
      responseCallback(JoinGroupResult(
        members = Map.empty,
        memberId = memberId,
        generationId = group.generationId,
        subProtocol = group.protocolOrNull,
        leaderId = group.leaderOrNull,
        error = Errors.NONE))
    }
{code}
While looking at the AbstractCoordinator, I found that the generation was 
hard-coded as 

NO_GENERATION on restart, which means we will send UNKNOWN_MEMBER_ID in the 
first join group request. This means we will treat the restarted consumer as a 
new member, so the rebalance will be triggered until session timeout.

I'm trying to clarify the following things before we extend the discussion:
 # Whether my understanding of the above logic is right (Hope [~mjsax] could 
help me double check)
 # Whether it makes sense to persist last round of memberId for consumers? We 
currently only need this feature in stream application, but will do no harm if 
we also use it for consumer in general. This would be a nice-to-have feature on 
consumer restart when we configured the loading-previous-memberId to true. If 
we failed, simply use the UNKNOWN_MEMBER_ID
 # The behavior could also be changed on the broker side, but I suspect it is 
very risky. So far client side change should be the least effort. The end goal 
is to avoid excessive rebalance from the same consumer restart, so if you feel 
server side change could also help, we could further discuss.

Thank you for helping out! [~mjsax] [~guozhang]

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to