This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 9bc48af1c11 MINOR: Add type check to classic group timeout operations 
(#15587)
9bc48af1c11 is described below

commit 9bc48af1c119a951f398010dbfc416754cc76fe1
Author: Dongnuo Lyu <139248811+dongnuo...@users.noreply.github.com>
AuthorDate: Wed Apr 10 03:36:49 2024 -0400

    MINOR: Add type check to classic group timeout operations (#15587)
    
    When implementing the group type conversion from a classic group to a 
consumer group, if the replay of conversion records fails, the group should be 
reverted back including its timeouts.
    
    A possible solution is to keep all the classic group timeouts and add a 
type check to the timeout operations. If the group is successfully upgraded, it 
won't be able to pass the type check and its operations will be executed 
without actually doing anything; if the group upgrade fails, the group map will 
be reverted and the timeout operations will be executed as is.
    
    We've already have group type check in consumer group timeout operations. 
This patch adds similar type check to those classic group timeout operations.
    
    Reviewers: David Jacot <dja...@confluent.io>
---
 .../coordinator/group/GroupMetadataManager.java    | 68 +++++++++++++++++-----
 1 file changed, 54 insertions(+), 14 deletions(-)

diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
index 9cfe8f617e6..d12ed6a9dea 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
@@ -2286,7 +2286,7 @@ public class GroupMetadataManager {
                 request.sessionTimeoutMs(),
                 TimeUnit.MILLISECONDS,
                 false,
-                () -> expireClassicGroupMemberHeartbeat(group, newMemberId)
+                () -> expireClassicGroupMemberHeartbeat(group.groupId(), 
newMemberId)
             );
 
             responseFuture.complete(new JoinGroupResponseData()
@@ -2457,6 +2457,22 @@ public class GroupMetadataManager {
         return EMPTY_RESULT;
     }
 
+    /**
+     * An overload of {@link 
GroupMetadataManager#completeClassicGroupJoin(ClassicGroup)} used as
+     * timeout operation. It additionally looks up the group by the id and 
checks the group type.
+     * completeClassicGroupJoin will only be called if the group is CLASSIC.
+     */
+    private CoordinatorResult<Void, Record> completeClassicGroupJoin(String 
groupId) {
+        ClassicGroup group;
+        try {
+            group = getOrMaybeCreateClassicGroup(groupId, false);
+        } catch (UnknownMemberIdException | GroupIdNotFoundException 
exception) {
+            log.debug("Cannot find the group, skipping rebalance stage.", 
exception);
+            return EMPTY_RESULT;
+        }
+        return completeClassicGroupJoin(group);
+    }
+
     /**
      * Complete the join group phase. Remove all dynamic members that have not 
rejoined
      * during this stage and proceed with the next generation for this group. 
The generation id
@@ -2504,7 +2520,7 @@ public class GroupMetadataManager {
                 group.rebalanceTimeoutMs(),
                 TimeUnit.MILLISECONDS,
                 false,
-                () -> completeClassicGroupJoin(group)
+                () -> completeClassicGroupJoin(group.groupId())
             );
 
             return EMPTY_RESULT;
@@ -2575,22 +2591,31 @@ public class GroupMetadataManager {
             group.rebalanceTimeoutMs(),
             TimeUnit.MILLISECONDS,
             false,
-            () -> expirePendingSync(group, group.generationId()));
+            () -> expirePendingSync(group.groupId(), group.generationId()));
     }
 
     /**
      * Invoked when the heartbeat operation is expired from the timer. 
Possibly remove the member and
      * try complete the join phase.
      *
-     * @param group     The group.
+     * @param groupId   The group id.
      * @param memberId  The member id.
      *
      * @return The coordinator result that will be appended to the log.
      */
     private CoordinatorResult<Void, Record> expireClassicGroupMemberHeartbeat(
-        ClassicGroup group,
+        String groupId,
         String memberId
     ) {
+        ClassicGroup group;
+        try {
+            group = getOrMaybeCreateClassicGroup(groupId, false);
+        } catch (UnknownMemberIdException | GroupIdNotFoundException 
exception) {
+            log.debug("Received notification of heartbeat expiration for 
member {} after group {} " +
+                "had already been deleted or upgraded.", memberId, groupId);
+            return EMPTY_RESULT;
+        }
+
         if (group.isInState(DEAD)) {
             log.info("Received notification of heartbeat expiration for member 
{} after group {} " +
                     "had already been unloaded or deleted.",
@@ -2805,7 +2830,7 @@ public class GroupMetadataManager {
                 delayMs,
                 TimeUnit.MILLISECONDS,
                 false,
-                () -> tryCompleteInitialRebalanceElseSchedule(group, delayMs, 
remainingMs)
+                () -> tryCompleteInitialRebalanceElseSchedule(group.groupId(), 
delayMs, remainingMs)
             );
         }
 
@@ -2837,7 +2862,7 @@ public class GroupMetadataManager {
                 group.rebalanceTimeoutMs(),
                 TimeUnit.MILLISECONDS,
                 false,
-                () -> completeClassicGroupJoin(group)
+                () -> completeClassicGroupJoin(group.groupId())
             );
             return EMPTY_RESULT;
         }
@@ -2847,15 +2872,23 @@ public class GroupMetadataManager {
      * Try to complete the join phase of the initial rebalance.
      * Otherwise, extend the rebalance.
      *
-     * @param group The group under initial rebalance.
+     * @param groupId The group under initial rebalance.
      *
      * @return The coordinator result that will be appended to the log.
      */
     private CoordinatorResult<Void, Record> 
tryCompleteInitialRebalanceElseSchedule(
-        ClassicGroup group,
+        String groupId,
         int delayMs,
         int remainingMs
     ) {
+        ClassicGroup group;
+        try {
+            group = getOrMaybeCreateClassicGroup(groupId, false);
+        } catch (UnknownMemberIdException | GroupIdNotFoundException 
exception) {
+            log.debug("Cannot find the group, skipping the initial rebalance 
stage.", exception);
+            return EMPTY_RESULT;
+        }
+
         if (group.newMemberAdded() && remainingMs != 0) {
             // A new member was added. Extend the delay.
             group.setNewMemberAdded(false);
@@ -2867,7 +2900,7 @@ public class GroupMetadataManager {
                 newDelayMs,
                 TimeUnit.MILLISECONDS,
                 false,
-                () -> tryCompleteInitialRebalanceElseSchedule(group, 
newDelayMs, newRemainingMs)
+                () -> tryCompleteInitialRebalanceElseSchedule(group.groupId(), 
newDelayMs, newRemainingMs)
             );
         } else {
             // No more time remaining. Complete the join phase.
@@ -2979,7 +3012,7 @@ public class GroupMetadataManager {
             timeoutMs,
             TimeUnit.MILLISECONDS,
             false,
-            () -> expireClassicGroupMemberHeartbeat(group, member.memberId()));
+            () -> expireClassicGroupMemberHeartbeat(group.groupId(), 
member.memberId()));
     }
 
     /**
@@ -2996,15 +3029,23 @@ public class GroupMetadataManager {
     /**
      * Expire pending sync.
      *
-     * @param group           The group.
+     * @param groupId         The group id.
      * @param generationId    The generation when the pending sync was 
originally scheduled.
      *
      * @return The coordinator result that will be appended to the log.
      * */
     private CoordinatorResult<Void, Record> expirePendingSync(
-        ClassicGroup group,
+        String groupId,
         int generationId
     ) {
+        ClassicGroup group;
+        try {
+            group = getOrMaybeCreateClassicGroup(groupId, false);
+        } catch (UnknownMemberIdException | GroupIdNotFoundException 
exception) {
+            log.debug("Received notification of sync expiration for an unknown 
classic group {}.", groupId);
+            return EMPTY_RESULT;
+        }
+
         if (generationId != group.generationId()) {
             log.error("Received unexpected notification of sync expiration for 
{} with an old " +
                 "generation {} while the group has {}.", group.groupId(), 
generationId, group.generationId());
@@ -3027,7 +3068,6 @@ public class GroupMetadataManager {
                 }
             }
         }
-
         return EMPTY_RESULT;
     }
 

Reply via email to