This is an automated email from the ASF dual-hosted git repository.
dajac pushed a commit to branch 3.9
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.9 by this push:
new beaba741d3e KAFKA-18263; Group lock must be acquired when reverting
static membership rejoin (#18207)
beaba741d3e is described below
commit beaba741d3e3e30e070c0547e709e91113d688fa
Author: David Jacot <[email protected]>
AuthorDate: Mon Dec 16 18:20:35 2024 +0100
KAFKA-18263; Group lock must be acquired when reverting static membership
rejoin (#18207)
When a static member rejoins the group, the group state is rewritten to the
partition in order to persist the change. If the write fails, the change is
reverted. However, this is done without acquiring the group lock.
This is only try in the old group coordinator. The new one does not suffer
from this issue.
Reviewers: Jeff Kim <[email protected]>
---
.../kafka/coordinator/group/GroupCoordinator.scala | 126 +++++++++++----------
1 file changed, 64 insertions(+), 62 deletions(-)
diff --git a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
index 58a37e3333b..94117d802ca 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
@@ -1376,68 +1376,70 @@ private[group] class GroupCoordinator(
info(s"Static member which joins during Stable stage and doesn't
affect selectProtocol will not trigger rebalance.")
val groupAssignment: Map[String, Array[Byte]] =
group.allMemberMetadata.map(member => member.memberId ->
member.assignment).toMap
groupManager.storeGroup(group, groupAssignment, error => {
- if (error != Errors.NONE) {
- warn(s"Failed to persist metadata for group ${group.groupId}:
${error.message}")
-
- // Failed to persist member.id of the given static member,
revert the update of the static member in the group.
- group.updateMember(knownStaticMember, oldProtocols,
oldRebalanceTimeoutMs, oldSessionTimeoutMs, null)
- val oldMember = group.replaceStaticMember(groupInstanceId,
newMemberId, oldMemberId)
- completeAndScheduleNextHeartbeatExpiration(group, oldMember)
- responseCallback(JoinGroupResult(
- List.empty,
- memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID,
- generationId = group.generationId,
- protocolType = group.protocolType,
- protocolName = group.protocolName,
- leaderId = currentLeader,
- skipAssignment = false,
- error = error
- ))
- } else if (supportSkippingAssignment) {
- // Starting from version 9 of the JoinGroup API, static members
are able to
- // skip running the assignor based on the `SkipAssignment`
field. We leverage
- // this to tell the leader that it is the leader of the group
but by skipping
- // running the assignor while the group is in stable state.
- // Notes:
- // 1) This allows the leader to continue monitoring metadata
changes for the
- // group. Note that any metadata changes happening while the
static leader is
- // down won't be noticed.
- // 2) The assignors are not idempotent nor free from side
effects. This is why
- // we skip entirely the assignment step as it could generate a
different group
- // assignment which would be ignored by the group coordinator
because the group
- // is the stable state.
- val isLeader = group.isLeader(newMemberId)
- group.maybeInvokeJoinCallback(member, JoinGroupResult(
- members = if (isLeader) {
- group.currentMemberMetadata
- } else {
- List.empty
- },
- memberId = newMemberId,
- generationId = group.generationId,
- protocolType = group.protocolType,
- protocolName = group.protocolName,
- leaderId = group.leaderOrNull,
- skipAssignment = isLeader,
- error = Errors.NONE
- ))
- } else {
- // Prior to version 9 of the JoinGroup API, we wanted to avoid
current leader
- // performing trivial assignment while the group is in stable
stage, because
- // the new assignment in leader's next sync call won't be
broadcast by a stable group.
- // This could be guaranteed by always returning the old leader
id so that the current
- // leader won't assume itself as a leader based on the returned
message, since the new
- // member.id won't match returned leader id, therefore no
assignment will be performed.
- group.maybeInvokeJoinCallback(member, JoinGroupResult(
- members = List.empty,
- memberId = newMemberId,
- generationId = group.generationId,
- protocolType = group.protocolType,
- protocolName = group.protocolName,
- leaderId = currentLeader,
- skipAssignment = false,
- error = Errors.NONE
- ))
+ group.inLock {
+ if (error != Errors.NONE) {
+ warn(s"Failed to persist metadata for group ${group.groupId}:
${error.message}")
+
+ // Failed to persist member.id of the given static member,
revert the update of the static member in the group.
+ group.updateMember(knownStaticMember, oldProtocols,
oldRebalanceTimeoutMs, oldSessionTimeoutMs, null)
+ val oldMember = group.replaceStaticMember(groupInstanceId,
newMemberId, oldMemberId)
+ completeAndScheduleNextHeartbeatExpiration(group, oldMember)
+ responseCallback(JoinGroupResult(
+ List.empty,
+ memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID,
+ generationId = group.generationId,
+ protocolType = group.protocolType,
+ protocolName = group.protocolName,
+ leaderId = currentLeader,
+ skipAssignment = false,
+ error = error
+ ))
+ } else if (supportSkippingAssignment) {
+ // Starting from version 9 of the JoinGroup API, static
members are able to
+ // skip running the assignor based on the `SkipAssignment`
field. We leverage
+ // this to tell the leader that it is the leader of the group
but by skipping
+ // running the assignor while the group is in stable state.
+ // Notes:
+ // 1) This allows the leader to continue monitoring metadata
changes for the
+ // group. Note that any metadata changes happening while the
static leader is
+ // down won't be noticed.
+ // 2) The assignors are not idempotent nor free from side
effects. This is why
+ // we skip entirely the assignment step as it could generate a
different group
+ // assignment which would be ignored by the group coordinator
because the group
+ // is the stable state.
+ val isLeader = group.isLeader(newMemberId)
+ group.maybeInvokeJoinCallback(member, JoinGroupResult(
+ members = if (isLeader) {
+ group.currentMemberMetadata
+ } else {
+ List.empty
+ },
+ memberId = newMemberId,
+ generationId = group.generationId,
+ protocolType = group.protocolType,
+ protocolName = group.protocolName,
+ leaderId = group.leaderOrNull,
+ skipAssignment = isLeader,
+ error = Errors.NONE
+ ))
+ } else {
+ // Prior to version 9 of the JoinGroup API, we wanted to avoid
current leader
+ // performing trivial assignment while the group is in stable
stage, because
+ // the new assignment in leader's next sync call won't be
broadcast by a stable group.
+ // This could be guaranteed by always returning the old leader
id so that the current
+ // leader won't assume itself as a leader based on the
returned message, since the new
+ // member.id won't match returned leader id, therefore no
assignment will be performed.
+ group.maybeInvokeJoinCallback(member, JoinGroupResult(
+ members = List.empty,
+ memberId = newMemberId,
+ generationId = group.generationId,
+ protocolType = group.protocolType,
+ protocolName = group.protocolName,
+ leaderId = currentLeader,
+ skipAssignment = false,
+ error = Errors.NONE
+ ))
+ }
}
}, requestLocal)
} else {