This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch 4.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/4.0 by this push:
     new 7a0568b891b KAFKA-18263; Group lock must be acquired when reverting 
static membership rejoin (#18207)
7a0568b891b is described below

commit 7a0568b891bd3fbd55a71b4cdd4f3518d1606cc2
Author: David Jacot <[email protected]>
AuthorDate: Mon Dec 16 18:20:35 2024 +0100

    KAFKA-18263; Group lock must be acquired when reverting static membership 
rejoin (#18207)
    
    When a static member rejoins the group, the group state is rewritten to the 
partition in order to persist the change. If the write fails, the change is 
reverted. However, this is done without acquiring the group lock.
    
    This is only try in the old group coordinator. The new one does not suffer 
from this issue.
    
    Reviewers: Jeff Kim <[email protected]>
---
 .../kafka/coordinator/group/GroupCoordinator.scala | 126 +++++++++++----------
 1 file changed, 64 insertions(+), 62 deletions(-)

diff --git a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala 
b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
index 6a55e51d3ae..2d317e4b55f 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
@@ -1384,68 +1384,70 @@ private[group] class GroupCoordinator(
           info(s"Static member which joins during Stable stage and doesn't 
affect selectProtocol will not trigger rebalance.")
           val groupAssignment: Map[String, Array[Byte]] = 
group.allMemberMetadata.map(member => member.memberId -> 
member.assignment).toMap
           groupManager.storeGroup(group, groupAssignment, error => {
-            if (error != Errors.NONE) {
-              warn(s"Failed to persist metadata for group ${group.groupId}: 
${error.message}")
-
-              // Failed to persist member.id of the given static member, 
revert the update of the static member in the group.
-              group.updateMember(knownStaticMember, oldProtocols, 
oldRebalanceTimeoutMs, oldSessionTimeoutMs, null)
-              val oldMember = group.replaceStaticMember(groupInstanceId, 
newMemberId, oldMemberId)
-              completeAndScheduleNextHeartbeatExpiration(group, oldMember)
-              responseCallback(JoinGroupResult(
-                List.empty,
-                memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID,
-                generationId = group.generationId,
-                protocolType = group.protocolType,
-                protocolName = group.protocolName,
-                leaderId = currentLeader,
-                skipAssignment = false,
-                error = error
-              ))
-            } else if (supportSkippingAssignment) {
-              // Starting from version 9 of the JoinGroup API, static members 
are able to
-              // skip running the assignor based on the `SkipAssignment` 
field. We leverage
-              // this to tell the leader that it is the leader of the group 
but by skipping
-              // running the assignor while the group is in stable state.
-              // Notes:
-              // 1) This allows the leader to continue monitoring metadata 
changes for the
-              // group. Note that any metadata changes happening while the 
static leader is
-              // down won't be noticed.
-              // 2) The assignors are not idempotent nor free from side 
effects. This is why
-              // we skip entirely the assignment step as it could generate a 
different group
-              // assignment which would be ignored by the group coordinator 
because the group
-              // is the stable state.
-              val isLeader = group.isLeader(newMemberId)
-              group.maybeInvokeJoinCallback(member, JoinGroupResult(
-                members = if (isLeader) {
-                  group.currentMemberMetadata
-                } else {
-                  List.empty
-                },
-                memberId = newMemberId,
-                generationId = group.generationId,
-                protocolType = group.protocolType,
-                protocolName = group.protocolName,
-                leaderId = group.leaderOrNull,
-                skipAssignment = isLeader,
-                error = Errors.NONE
-              ))
-            } else {
-              // Prior to version 9 of the JoinGroup API, we wanted to avoid 
current leader
-              // performing trivial assignment while the group is in stable 
stage, because
-              // the new assignment in leader's next sync call won't be 
broadcast by a stable group.
-              // This could be guaranteed by always returning the old leader 
id so that the current
-              // leader won't assume itself as a leader based on the returned 
message, since the new
-              // member.id won't match returned leader id, therefore no 
assignment will be performed.
-              group.maybeInvokeJoinCallback(member, JoinGroupResult(
-                members = List.empty,
-                memberId = newMemberId,
-                generationId = group.generationId,
-                protocolType = group.protocolType,
-                protocolName = group.protocolName,
-                leaderId = currentLeader,
-                skipAssignment = false,
-                error = Errors.NONE
-              ))
+            group.inLock {
+              if (error != Errors.NONE) {
+                warn(s"Failed to persist metadata for group ${group.groupId}: 
${error.message}")
+
+                // Failed to persist member.id of the given static member, 
revert the update of the static member in the group.
+                group.updateMember(knownStaticMember, oldProtocols, 
oldRebalanceTimeoutMs, oldSessionTimeoutMs, null)
+                val oldMember = group.replaceStaticMember(groupInstanceId, 
newMemberId, oldMemberId)
+                completeAndScheduleNextHeartbeatExpiration(group, oldMember)
+                responseCallback(JoinGroupResult(
+                  List.empty,
+                  memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID,
+                  generationId = group.generationId,
+                  protocolType = group.protocolType,
+                  protocolName = group.protocolName,
+                  leaderId = currentLeader,
+                  skipAssignment = false,
+                  error = error
+                ))
+              } else if (supportSkippingAssignment) {
+                // Starting from version 9 of the JoinGroup API, static 
members are able to
+                // skip running the assignor based on the `SkipAssignment` 
field. We leverage
+                // this to tell the leader that it is the leader of the group 
but by skipping
+                // running the assignor while the group is in stable state.
+                // Notes:
+                // 1) This allows the leader to continue monitoring metadata 
changes for the
+                // group. Note that any metadata changes happening while the 
static leader is
+                // down won't be noticed.
+                // 2) The assignors are not idempotent nor free from side 
effects. This is why
+                // we skip entirely the assignment step as it could generate a 
different group
+                // assignment which would be ignored by the group coordinator 
because the group
+                // is the stable state.
+                val isLeader = group.isLeader(newMemberId)
+                group.maybeInvokeJoinCallback(member, JoinGroupResult(
+                  members = if (isLeader) {
+                    group.currentMemberMetadata
+                  } else {
+                    List.empty
+                  },
+                  memberId = newMemberId,
+                  generationId = group.generationId,
+                  protocolType = group.protocolType,
+                  protocolName = group.protocolName,
+                  leaderId = group.leaderOrNull,
+                  skipAssignment = isLeader,
+                  error = Errors.NONE
+                ))
+              } else {
+                // Prior to version 9 of the JoinGroup API, we wanted to avoid 
current leader
+                // performing trivial assignment while the group is in stable 
stage, because
+                // the new assignment in leader's next sync call won't be 
broadcast by a stable group.
+                // This could be guaranteed by always returning the old leader 
id so that the current
+                // leader won't assume itself as a leader based on the 
returned message, since the new
+                // member.id won't match returned leader id, therefore no 
assignment will be performed.
+                group.maybeInvokeJoinCallback(member, JoinGroupResult(
+                  members = List.empty,
+                  memberId = newMemberId,
+                  generationId = group.generationId,
+                  protocolType = group.protocolType,
+                  protocolName = group.protocolName,
+                  leaderId = currentLeader,
+                  skipAssignment = false,
+                  error = Errors.NONE
+                ))
+              }
             }
           }, requestLocal)
         } else {

Reply via email to