[ https://issues.apache.org/jira/browse/KAFKA-14832?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
zou shengfu updated KAFKA-14832: -------------------------------- Description: {code:java} groupManager.storeGroup(group, groupAssignment, error => { if (error != Errors.NONE) { warn(s"Failed to persist metadata for group ${group.groupId}: ${error.message}") // Failed to persist member.id of the given static member, revert the update of the static member in the group. group.updateMember(knownStaticMember, oldProtocols, oldRebalanceTimeoutMs, oldSessionTimeoutMs, null) val oldMember = group.replaceStaticMember(groupInstanceId, newMemberId, oldMemberId) completeAndScheduleNextHeartbeatExpiration(group, oldMember) responseCallback(JoinGroupResult( List.empty, memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID, generationId = group.generationId, protocolType = group.protocolType, protocolName = group.protocolName, leaderId = currentLeader, skipAssignment = false, error = error )) } else if (supportSkippingAssignment) { // Starting from version 9 of the JoinGroup API, static members are able to // skip running the assignor based on the `SkipAssignment` field. We leverage // this to tell the leader that it is the leader of the group but by skipping // running the assignor while the group is in stable state. // Notes: // 1) This allows the leader to continue monitoring metadata changes for the // group. Note that any metadata changes happening while the static leader is // down won't be noticed. // 2) The assignors are not idempotent nor free from side effects. This is why // we skip entirely the assignment step as it could generate a different group // assignment which would be ignored by the group coordinator because the group // is the stable state. val isLeader = group.isLeader(newMemberId) group.maybeInvokeJoinCallback(member, JoinGroupResult( members = if (isLeader) { group.currentMemberMetadata } else { List.empty }, memberId = newMemberId, generationId = group.generationId, protocolType = group.protocolType, protocolName = group.protocolName, leaderId = group.leaderOrNull, skipAssignment = isLeader, error = Errors.NONE )) } else { // Prior to version 9 of the JoinGroup API, we wanted to avoid current leader // performing trivial assignment while the group is in stable stage, because // the new assignment in leader's next sync call won't be broadcast by a stable group. // This could be guaranteed by always returning the old leader id so that the current // leader won't assume itself as a leader based on the returned message, since the new // member.id won't match returned leader id, therefore no assignment will be performed. group.maybeInvokeJoinCallback(member, JoinGroupResult( members = List.empty, memberId = newMemberId, generationId = group.generationId, protocolType = group.protocolType, protocolName = group.protocolName, leaderId = currentLeader, skipAssignment = false, error = Errors.NONE )) }{code} was: {code:java} //代码占位符 {code} groupManager.storeGroup(group, groupAssignment, error => \{ if (error != Errors.NONE) { warn(s"Failed to persist metadata for group ${group.groupId}: ${error.message}") // Failed to persist member.id of the given static member, revert the update of the static member in the group. group.updateMember(knownStaticMember, oldProtocols, oldRebalanceTimeoutMs, oldSessionTimeoutMs, null) val oldMember = group.replaceStaticMember(groupInstanceId, newMemberId, oldMemberId) completeAndScheduleNextHeartbeatExpiration(group, oldMember) responseCallback(JoinGroupResult( List.empty, memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID, generationId = group.generationId, protocolType = group.protocolType, protocolName = group.protocolName, leaderId = currentLeader, skipAssignment = false, error = error )) } else if (supportSkippingAssignment) \{ // Starting from version 9 of the JoinGroup API, static members are able to // skip running the assignor based on the `SkipAssignment` field. We leverage // this to tell the leader that it is the leader of the group but by skipping // running the assignor while the group is in stable state. // Notes: // 1) This allows the leader to continue monitoring metadata changes for the // group. Note that any metadata changes happening while the static leader is // down won't be noticed. // 2) The assignors are not idempotent nor free from side effects. This is why // we skip entirely the assignment step as it could generate a different group // assignment which would be ignored by the group coordinator because the group // is the stable state. val isLeader = group.isLeader(newMemberId) group.maybeInvokeJoinCallback(member, JoinGroupResult( members = if (isLeader) { group.currentMemberMetadata } else \{ List.empty }, memberId = newMemberId, generationId = group.generationId, protocolType = group.protocolType, protocolName = group.protocolName, leaderId = group.leaderOrNull, skipAssignment = isLeader, error = Errors.NONE )) } else \{ // Prior to version 9 of the JoinGroup API, we wanted to avoid current leader // performing trivial assignment while the group is in stable stage, because // the new assignment in leader's next sync call won't be broadcast by a stable group. // This could be guaranteed by always returning the old leader id so that the current // leader won't assume itself as a leader based on the returned message, since the new // member.id won't match returned leader id, therefore no assignment will be performed. group.maybeInvokeJoinCallback(member, JoinGroupResult( members = List.empty, memberId = newMemberId, generationId = group.generationId, protocolType = group.protocolType, protocolName = group.protocolName, leaderId = currentLeader, skipAssignment = false, error = Errors.NONE )) } Summary: Thread unsafe for GroupMetadata when persisting metadata (was: Thread unsafe for GroupMetadata) > Thread unsafe for GroupMetadata when persisting metadata > --------------------------------------------------------- > > Key: KAFKA-14832 > URL: https://issues.apache.org/jira/browse/KAFKA-14832 > Project: Kafka > Issue Type: Bug > Components: core > Affects Versions: 3.3.2 > Reporter: zou shengfu > Assignee: zou shengfu > Priority: Major > > {code:java} > groupManager.storeGroup(group, groupAssignment, error => { > if (error != Errors.NONE) { warn(s"Failed to persist metadata > for group ${group.groupId}: ${error.message}") > // Failed to persist member.id of the given static member, > revert the update of the static member in the group. > group.updateMember(knownStaticMember, oldProtocols, oldRebalanceTimeoutMs, > oldSessionTimeoutMs, null) val oldMember = > group.replaceStaticMember(groupInstanceId, newMemberId, oldMemberId) > completeAndScheduleNextHeartbeatExpiration(group, oldMember) > responseCallback(JoinGroupResult( List.empty, > memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID, generationId = > group.generationId, protocolType = group.protocolType, > protocolName = group.protocolName, leaderId = > currentLeader, skipAssignment = false, error = > error )) } else if (supportSkippingAssignment) { > // Starting from version 9 of the JoinGroup API, static members are > able to // skip running the assignor based on the > `SkipAssignment` field. We leverage // this to tell the leader > that it is the leader of the group but by skipping // running > the assignor while the group is in stable state. // Notes: > // 1) This allows the leader to continue monitoring metadata changes > for the // group. Note that any metadata changes happening while > the static leader is // down won't be noticed. // > 2) The assignors are not idempotent nor free from side effects. This is why > // we skip entirely the assignment step as it could generate a > different group // assignment which would be ignored by the > group coordinator because the group // is the stable state. > val isLeader = group.isLeader(newMemberId) > group.maybeInvokeJoinCallback(member, JoinGroupResult( members > = if (isLeader) { group.currentMemberMetadata > } else { List.empty }, > memberId = newMemberId, generationId = group.generationId, > protocolType = group.protocolType, protocolName = > group.protocolName, leaderId = group.leaderOrNull, > skipAssignment = isLeader, error = Errors.NONE > )) } else { // Prior to version 9 of the JoinGroup > API, we wanted to avoid current leader // performing trivial > assignment while the group is in stable stage, because // the > new assignment in leader's next sync call won't be broadcast by a stable > group. // This could be guaranteed by always returning the old > leader id so that the current // leader won't assume itself as a > leader based on the returned message, since the new // member.id > won't match returned leader id, therefore no assignment will be performed. > group.maybeInvokeJoinCallback(member, JoinGroupResult( > members = List.empty, memberId = newMemberId, > generationId = group.generationId, protocolType = > group.protocolType, protocolName = group.protocolName, > leaderId = currentLeader, skipAssignment = false, > error = Errors.NONE )) }{code} -- This message was sent by Atlassian Jira (v8.20.10#820010)