This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch 3.9
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.9 by this push:
     new beaba741d3e KAFKA-18263; Group lock must be acquired when reverting 
static membership rejoin (#18207)
beaba741d3e is described below

commit beaba741d3e3e30e070c0547e709e91113d688fa
Author: David Jacot <[email protected]>
AuthorDate: Mon Dec 16 18:20:35 2024 +0100

    KAFKA-18263; Group lock must be acquired when reverting static membership 
rejoin (#18207)
    
    When a static member rejoins the group, the group state is rewritten to the 
partition in order to persist the change. If the write fails, the change is 
reverted. However, this is done without acquiring the group lock.
    
    This is only try in the old group coordinator. The new one does not suffer 
from this issue.
    
    Reviewers: Jeff Kim <[email protected]>
---
 .../kafka/coordinator/group/GroupCoordinator.scala | 126 +++++++++++----------
 1 file changed, 64 insertions(+), 62 deletions(-)

diff --git a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala 
b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
index 58a37e3333b..94117d802ca 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
@@ -1376,68 +1376,70 @@ private[group] class GroupCoordinator(
           info(s"Static member which joins during Stable stage and doesn't 
affect selectProtocol will not trigger rebalance.")
           val groupAssignment: Map[String, Array[Byte]] = 
group.allMemberMetadata.map(member => member.memberId -> 
member.assignment).toMap
           groupManager.storeGroup(group, groupAssignment, error => {
-            if (error != Errors.NONE) {
-              warn(s"Failed to persist metadata for group ${group.groupId}: 
${error.message}")
-
-              // Failed to persist member.id of the given static member, 
revert the update of the static member in the group.
-              group.updateMember(knownStaticMember, oldProtocols, 
oldRebalanceTimeoutMs, oldSessionTimeoutMs, null)
-              val oldMember = group.replaceStaticMember(groupInstanceId, 
newMemberId, oldMemberId)
-              completeAndScheduleNextHeartbeatExpiration(group, oldMember)
-              responseCallback(JoinGroupResult(
-                List.empty,
-                memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID,
-                generationId = group.generationId,
-                protocolType = group.protocolType,
-                protocolName = group.protocolName,
-                leaderId = currentLeader,
-                skipAssignment = false,
-                error = error
-              ))
-            } else if (supportSkippingAssignment) {
-              // Starting from version 9 of the JoinGroup API, static members 
are able to
-              // skip running the assignor based on the `SkipAssignment` 
field. We leverage
-              // this to tell the leader that it is the leader of the group 
but by skipping
-              // running the assignor while the group is in stable state.
-              // Notes:
-              // 1) This allows the leader to continue monitoring metadata 
changes for the
-              // group. Note that any metadata changes happening while the 
static leader is
-              // down won't be noticed.
-              // 2) The assignors are not idempotent nor free from side 
effects. This is why
-              // we skip entirely the assignment step as it could generate a 
different group
-              // assignment which would be ignored by the group coordinator 
because the group
-              // is the stable state.
-              val isLeader = group.isLeader(newMemberId)
-              group.maybeInvokeJoinCallback(member, JoinGroupResult(
-                members = if (isLeader) {
-                  group.currentMemberMetadata
-                } else {
-                  List.empty
-                },
-                memberId = newMemberId,
-                generationId = group.generationId,
-                protocolType = group.protocolType,
-                protocolName = group.protocolName,
-                leaderId = group.leaderOrNull,
-                skipAssignment = isLeader,
-                error = Errors.NONE
-              ))
-            } else {
-              // Prior to version 9 of the JoinGroup API, we wanted to avoid 
current leader
-              // performing trivial assignment while the group is in stable 
stage, because
-              // the new assignment in leader's next sync call won't be 
broadcast by a stable group.
-              // This could be guaranteed by always returning the old leader 
id so that the current
-              // leader won't assume itself as a leader based on the returned 
message, since the new
-              // member.id won't match returned leader id, therefore no 
assignment will be performed.
-              group.maybeInvokeJoinCallback(member, JoinGroupResult(
-                members = List.empty,
-                memberId = newMemberId,
-                generationId = group.generationId,
-                protocolType = group.protocolType,
-                protocolName = group.protocolName,
-                leaderId = currentLeader,
-                skipAssignment = false,
-                error = Errors.NONE
-              ))
+            group.inLock {
+              if (error != Errors.NONE) {
+                warn(s"Failed to persist metadata for group ${group.groupId}: 
${error.message}")
+
+                // Failed to persist member.id of the given static member, 
revert the update of the static member in the group.
+                group.updateMember(knownStaticMember, oldProtocols, 
oldRebalanceTimeoutMs, oldSessionTimeoutMs, null)
+                val oldMember = group.replaceStaticMember(groupInstanceId, 
newMemberId, oldMemberId)
+                completeAndScheduleNextHeartbeatExpiration(group, oldMember)
+                responseCallback(JoinGroupResult(
+                  List.empty,
+                  memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID,
+                  generationId = group.generationId,
+                  protocolType = group.protocolType,
+                  protocolName = group.protocolName,
+                  leaderId = currentLeader,
+                  skipAssignment = false,
+                  error = error
+                ))
+              } else if (supportSkippingAssignment) {
+                // Starting from version 9 of the JoinGroup API, static 
members are able to
+                // skip running the assignor based on the `SkipAssignment` 
field. We leverage
+                // this to tell the leader that it is the leader of the group 
but by skipping
+                // running the assignor while the group is in stable state.
+                // Notes:
+                // 1) This allows the leader to continue monitoring metadata 
changes for the
+                // group. Note that any metadata changes happening while the 
static leader is
+                // down won't be noticed.
+                // 2) The assignors are not idempotent nor free from side 
effects. This is why
+                // we skip entirely the assignment step as it could generate a 
different group
+                // assignment which would be ignored by the group coordinator 
because the group
+                // is the stable state.
+                val isLeader = group.isLeader(newMemberId)
+                group.maybeInvokeJoinCallback(member, JoinGroupResult(
+                  members = if (isLeader) {
+                    group.currentMemberMetadata
+                  } else {
+                    List.empty
+                  },
+                  memberId = newMemberId,
+                  generationId = group.generationId,
+                  protocolType = group.protocolType,
+                  protocolName = group.protocolName,
+                  leaderId = group.leaderOrNull,
+                  skipAssignment = isLeader,
+                  error = Errors.NONE
+                ))
+              } else {
+                // Prior to version 9 of the JoinGroup API, we wanted to avoid 
current leader
+                // performing trivial assignment while the group is in stable 
stage, because
+                // the new assignment in leader's next sync call won't be 
broadcast by a stable group.
+                // This could be guaranteed by always returning the old leader 
id so that the current
+                // leader won't assume itself as a leader based on the 
returned message, since the new
+                // member.id won't match returned leader id, therefore no 
assignment will be performed.
+                group.maybeInvokeJoinCallback(member, JoinGroupResult(
+                  members = List.empty,
+                  memberId = newMemberId,
+                  generationId = group.generationId,
+                  protocolType = group.protocolType,
+                  protocolName = group.protocolName,
+                  leaderId = currentLeader,
+                  skipAssignment = false,
+                  error = Errors.NONE
+                ))
+              }
             }
           }, requestLocal)
         } else {

Reply via email to