This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch 3.8
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.8 by this push:
     new c05c5a9fdf8 MINOR: Make online downgrade failure logs less noisy and 
update the timeouts scheduled in `convertToConsumerGroup` (#16290)
c05c5a9fdf8 is described below

commit c05c5a9fdf82750ae19f36a854ecc8ef5db44b4d
Author: Dongnuo Lyu <[email protected]>
AuthorDate: Thu Jun 13 05:11:01 2024 -0400

    MINOR: Make online downgrade failure logs less noisy and update the 
timeouts scheduled in `convertToConsumerGroup` (#16290)
    
    This patch:
    - changes the order of the checks in `validateOnlineDowngrade`, so that 
only when the last member using the consumer protocol leave and the group still 
has classic member(s), `online downgrade is disabled` is logged if the policy 
doesn't allow downgrade.
    - changes the session timeout in `convertToConsumerGroup` from 
`consumerGroupSessionTimeoutMs` to 
`member.classicProtocolSessionTimeout().get()`.
    
    Reviewers: David Jacot <[email protected]>
---
 .../kafka/coordinator/group/GroupMetadataManager.java    | 16 +++++++---------
 .../coordinator/group/GroupMetadataManagerTest.java      | 10 +++++-----
 2 files changed, 12 insertions(+), 14 deletions(-)

diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
index 6463d78d9ce..9afb8d7791d 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
@@ -794,18 +794,16 @@ public class GroupMetadataManager {
      * @return A boolean indicating whether it's valid to online downgrade the 
consumer group.
      */
     private boolean validateOnlineDowngrade(ConsumerGroup consumerGroup, 
String memberId) {
-        if (!consumerGroupMigrationPolicy.isDowngradeEnabled()) {
-            log.info("Cannot downgrade consumer group {} to classic group 
because the online downgrade is disabled.",
-                consumerGroup.groupId());
-            return false;
-        } else if 
(!consumerGroup.allMembersUseClassicProtocolExcept(memberId)) {
-            log.debug("Cannot downgrade consumer group {} to classic group 
because not all its members use the classic protocol.",
-                consumerGroup.groupId());
+        if (!consumerGroup.allMembersUseClassicProtocolExcept(memberId)) {
             return false;
         } else if (consumerGroup.numMembers() <= 1) {
             log.debug("Skip downgrading the consumer group {} to classic group 
because it's empty.",
                 consumerGroup.groupId());
             return false;
+        } else if (!consumerGroupMigrationPolicy.isDowngradeEnabled()) {
+            log.info("Cannot downgrade consumer group {} to classic group 
because the online downgrade is disabled.",
+                consumerGroup.groupId());
+            return false;
         } else if (consumerGroup.numMembers() - 1 > classicGroupMaxSize) {
             log.info("Cannot downgrade consumer group {} to classic group 
because its group size is greater than classic group max size.",
                 consumerGroup.groupId());
@@ -927,8 +925,8 @@ public class GroupMetadataManager {
 
         // Create the session timeouts for the new members. If the conversion 
fails, the group will remain a
         // classic group, thus these timers will fail the group type check and 
do nothing.
-        consumerGroup.members().forEach((memberId, __) ->
-            scheduleConsumerGroupSessionTimeout(consumerGroup.groupId(), 
memberId)
+        consumerGroup.members().forEach((memberId, member) ->
+            scheduleConsumerGroupSessionTimeout(consumerGroup.groupId(), 
memberId, member.classicProtocolSessionTimeout().get())
         );
 
         return consumerGroup;
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
index ad68c96aaab..248f5507a06 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
@@ -9630,7 +9630,7 @@ public class GroupMetadataManagerTest {
 
         assertRecordsEquals(expectedRecords, result.records());
 
-        context.assertSessionTimeout(groupId, memberId1, 45000);
+        context.assertSessionTimeout(groupId, memberId1, 
expectedMember1.classicProtocolSessionTimeout().get());
         context.assertSessionTimeout(groupId, memberId2, 45000);
 
         // Simulate a failed replay. The context is rolled back and the group 
is converted back to the classic group.
@@ -9876,8 +9876,8 @@ public class GroupMetadataManagerTest {
         assertTrue(joinResult.joinFuture.isDone());
         assertEquals(Errors.REBALANCE_IN_PROGRESS.code(), 
joinResult.joinFuture.get().errorCode());
 
-        context.assertSessionTimeout(groupId, memberId1, 45000);
-        context.assertSessionTimeout(groupId, memberId2, 45000);
+        context.assertSessionTimeout(groupId, memberId1, 
expectedMember1.classicProtocolSessionTimeout().get());
+        context.assertSessionTimeout(groupId, memberId2, 
expectedMember2.classicProtocolSessionTimeout().get());
         context.assertSessionTimeout(groupId, memberId3, 45000);
 
         // Simulate a failed replay. The context is rolled back and the group 
is converted back to the classic group.
@@ -10141,8 +10141,8 @@ public class GroupMetadataManagerTest {
         assertTrue(syncResult.syncFuture.isDone());
         assertEquals(Errors.REBALANCE_IN_PROGRESS.code(), 
syncResult.syncFuture.get().errorCode());
 
-        context.assertSessionTimeout(groupId, memberId1, 45000);
-        context.assertSessionTimeout(groupId, memberId2, 45000);
+        context.assertSessionTimeout(groupId, memberId1, 
expectedMember1.classicProtocolSessionTimeout().get());
+        context.assertSessionTimeout(groupId, memberId2, 
expectedMember2.classicProtocolSessionTimeout().get());
         context.assertSessionTimeout(groupId, memberId3, 45000);
 
         // Simulate a failed replay. The context is rolled back and the group 
is converted back to the classic group.

Reply via email to