This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new cbc02e006d0 KAFKA-16106; Schedule timeout task to refresh classic 
group size metric (#17325)
cbc02e006d0 is described below

commit cbc02e006d0f1805ff14c843b9c80ce9d601c23b
Author: Dongnuo Lyu <[email protected]>
AuthorDate: Fri Oct 4 03:31:06 2024 -0400

    KAFKA-16106; Schedule timeout task to refresh classic group size metric 
(#17325)
    
    In the existing implementation, If an operation modifying the classic group 
state fails, the group reverts but the group size counter does not. This 
creates an inconsistency between the group size metric and the actual group 
size.
    
    Considering that It will be complicated to rely on the appendFuture to 
revert the metrics upon the operation failure, this PR introduces a new 
implementation. A timeout task will periodically refresh the metrics based on 
the current groups soft state. The refreshing interval is hardcoded to 60 
seconds.
    
    Reviewers: David Jacot <[email protected]>
---
 .../coordinator/group/GroupCoordinatorShard.java   | 38 +++++++++
 .../coordinator/group/GroupMetadataManager.java    | 39 +++++----
 .../org/apache/kafka/coordinator/group/Utils.java  | 16 ++++
 .../coordinator/group/classic/ClassicGroup.java    | 15 +---
 .../metrics/GroupCoordinatorMetricsShard.java      | 85 +++----------------
 .../group/GroupCoordinatorRecordHelpersTest.java   | 14 +---
 .../group/GroupCoordinatorShardTest.java           | 43 ++++++++++
 .../group/GroupMetadataManagerTest.java            | 96 ++++++++++------------
 .../group/classic/ClassicGroupTest.java            | 41 +--------
 .../metrics/GroupCoordinatorMetricsShardTest.java  | 62 --------------
 .../group/metrics/GroupCoordinatorMetricsTest.java | 19 +++--
 11 files changed, 195 insertions(+), 273 deletions(-)

diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
index c6eced2158e..70b8d88fa04 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
@@ -248,6 +248,18 @@ public class GroupCoordinatorShard implements 
CoordinatorShard<CoordinatorRecord
      */
     static final String GROUP_EXPIRATION_KEY = "expire-group-metadata";
 
+    /**
+     * The classic group size counter key to schedule a timer task.
+     *
+     * Visible for testing.
+     */
+    static final String CLASSIC_GROUP_SIZE_COUNTER_KEY = 
"classic-group-size-counter";
+
+    /**
+     * Hardcoded default value of the interval to update the classic group 
size counter.
+     */
+    static final int DEFAULT_GROUP_GAUGES_UPDATE_INTERVAL_MS = 60 * 1000;
+
     /**
      * The logger.
      */
@@ -677,6 +689,30 @@ public class GroupCoordinatorShard implements 
CoordinatorShard<CoordinatorRecord
         return new CoordinatorResult<>(records, false);
     }
 
+    /**
+     * Schedules (or reschedules) the group size counter for the classic 
groups.
+     */
+    private void scheduleClassicGroupSizeCounter() {
+        timer.schedule(
+            CLASSIC_GROUP_SIZE_COUNTER_KEY,
+            DEFAULT_GROUP_GAUGES_UPDATE_INTERVAL_MS,
+            TimeUnit.MILLISECONDS,
+            true,
+            () -> {
+                groupMetadataManager.updateClassicGroupSizeCounter();
+                scheduleClassicGroupSizeCounter();
+                return GroupMetadataManager.EMPTY_RESULT;
+            }
+        );
+    }
+
+    /**
+     * Cancels the group size counter for the classic groups.
+     */
+    private void cancelClassicGroupSizeCounter() {
+        timer.cancel(CLASSIC_GROUP_SIZE_COUNTER_KEY);
+    }
+
     /**
      * The coordinator has been loaded. This is used to apply any
      * post loading operations (e.g. registering timers).
@@ -692,6 +728,7 @@ public class GroupCoordinatorShard implements 
CoordinatorShard<CoordinatorRecord
 
         groupMetadataManager.onLoaded();
         scheduleGroupMetadataExpiration();
+        scheduleClassicGroupSizeCounter();
     }
 
     @Override
@@ -699,6 +736,7 @@ public class GroupCoordinatorShard implements 
CoordinatorShard<CoordinatorRecord
         timer.cancel(GROUP_EXPIRATION_KEY);
         coordinatorMetrics.deactivateMetricsShard(metricsShard);
         groupMetadataManager.onUnloaded();
+        cancelClassicGroupSizeCounter();
     }
 
     /**
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
index 384c5295c97..39ac3c2e2f8 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
@@ -868,7 +868,6 @@ public class GroupMetadataManager {
             ConsumerGroup consumerGroup = new ConsumerGroup(snapshotRegistry, 
groupId, metrics);
             groups.put(groupId, consumerGroup);
             metrics.onConsumerGroupStateTransition(null, 
consumerGroup.state());
-            metrics.onClassicGroupStateTransition(EMPTY, null);
             return consumerGroup;
         } else {
             throw new IllegalStateException(String.format("Group %s is not a 
consumer group", groupId));
@@ -899,9 +898,8 @@ public class GroupMetadataManager {
         }
 
         if (group == null) {
-            ClassicGroup classicGroup = new ClassicGroup(logContext, groupId, 
ClassicGroupState.EMPTY, time, metrics);
+            ClassicGroup classicGroup = new ClassicGroup(logContext, groupId, 
ClassicGroupState.EMPTY, time);
             groups.put(groupId, classicGroup);
-            metrics.onClassicGroupStateTransition(null, 
classicGroup.currentState());
             return classicGroup;
         } else {
             if (group.type() == CLASSIC) {
@@ -1091,7 +1089,6 @@ public class GroupMetadataManager {
                 leavingMemberId,
                 logContext,
                 time,
-                metrics,
                 metadataImage
             );
         } catch (SchemaException e) {
@@ -1108,17 +1105,11 @@ public class GroupMetadataManager {
         // Set the appendFuture to prevent the records from being replayed.
         removeGroup(consumerGroup.groupId());
         groups.put(consumerGroup.groupId(), classicGroup);
-        metrics.onClassicGroupStateTransition(null, 
classicGroup.currentState());
 
         classicGroup.allMembers().forEach(member -> 
rescheduleClassicGroupMemberHeartbeat(classicGroup, member));
         prepareRebalance(classicGroup, String.format("Downgrade group %s from 
consumer to classic.", classicGroup.groupId()));
 
-        CompletableFuture<Void> appendFuture = new CompletableFuture<>();
-        appendFuture.exceptionally(__ -> {
-            metrics.onClassicGroupStateTransition(classicGroup.currentState(), 
null);
-            return null;
-        });
-        return new CoordinatorResult<>(records, response, appendFuture, false);
+        return new CoordinatorResult<>(records, response, null, false);
     }
 
     /**
@@ -1204,8 +1195,7 @@ public class GroupMetadataManager {
                     
metrics.onConsumerGroupStateTransition(consumerGroup.state(), null);
                     break;
                 case CLASSIC:
-                    ClassicGroup classicGroup = (ClassicGroup) group;
-                    
metrics.onClassicGroupStateTransition(classicGroup.currentState(), null);
+                    // The classic group size counter is implemented as 
scheduled task.
                     break;
                 case SHARE:
                     // Nothing for now, but we may want to add metrics in the 
future.
@@ -3742,11 +3732,25 @@ public class GroupMetadataManager {
         });
     }
 
+    /**
+     * Counts and updates the number of classic groups in different states.
+     */
+    public void updateClassicGroupSizeCounter() {
+        Map<ClassicGroupState, Long> groupSizeCounter = new HashMap<>();
+        groups.forEach((__, group) -> {
+            if (group.type() == CLASSIC) {
+                groupSizeCounter.compute(((ClassicGroup) 
group).currentState(), Utils::incValue);
+            }
+        });
+        metrics.setClassicGroupGauges(groupSizeCounter);
+    }
+
     /**
      * The coordinator has been loaded. Session timeouts are registered
      * for all members.
      */
     public void onLoaded() {
+        Map<ClassicGroupState, Long> classicGroupSizeCounter = new HashMap<>();
         groups.forEach((groupId, group) -> {
             switch (group.type()) {
                 case CONSUMER:
@@ -3780,6 +3784,8 @@ public class GroupMetadataManager {
                             " (size " + classicGroup.numMembers() + ") is over 
capacity " + classicGroupMaxSize +
                             ". Rebalancing in order to give a chance for 
consumers to commit offsets");
                     }
+
+                    
classicGroupSizeCounter.compute(classicGroup.currentState(), Utils::incValue);
                     break;
 
                 case SHARE:
@@ -3791,6 +3797,7 @@ public class GroupMetadataManager {
                     break;
             }
         });
+        metrics.setClassicGroupGauges(classicGroupSizeCounter);
     }
 
     /**
@@ -3901,7 +3908,6 @@ public class GroupMetadataManager {
                 groupId,
                 loadedMembers.isEmpty() ? EMPTY : STABLE,
                 time,
-                metrics,
                 value.generation(),
                 protocolType == null || protocolType.isEmpty() ? 
Optional.empty() : Optional.of(protocolType),
                 Optional.ofNullable(value.protocol()),
@@ -3910,10 +3916,7 @@ public class GroupMetadataManager {
             );
 
             loadedMembers.forEach(member -> classicGroup.add(member, null));
-            Group prevGroup = groups.put(groupId, classicGroup);
-            if (prevGroup == null) {
-                metrics.onClassicGroupStateTransition(null, 
classicGroup.currentState());
-            }
+            groups.put(groupId, classicGroup);
 
             classicGroup.setSubscribedTopics(
                 classicGroup.computeSubscribedTopics()
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Utils.java 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Utils.java
index 91b498f62cc..5e32d17ae50 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Utils.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Utils.java
@@ -102,6 +102,22 @@ public class Utils {
         return value == null ? 1 : value + 1;
     }
 
+    /**
+     * Decrements value by 1; returns null when reaching zero. This helper is
+     * meant to be used with Map#compute.
+     */
+    public static Long decValue(Object key, Long value) {
+        if (value == null) return null;
+        return value == 1 ? null : value - 1;
+    }
+
+    /**
+     * Increments value by 1; This helper is meant to be used with Map#compute.
+     */
+    public static Long incValue(Object key, Long value) {
+        return value == null ? 1 : value + 1;
+    }
+
     /**
      * @return An Optional containing the provided string if it is not null 
and not empty,
      *         otherwise an empty Optional.
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/classic/ClassicGroup.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/classic/ClassicGroup.java
index 907380289e6..265e5ea4453 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/classic/ClassicGroup.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/classic/ClassicGroup.java
@@ -38,7 +38,6 @@ import org.apache.kafka.coordinator.group.Group;
 import org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers;
 import org.apache.kafka.coordinator.group.OffsetExpirationCondition;
 import org.apache.kafka.coordinator.group.OffsetExpirationConditionImpl;
-import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetricsShard;
 import org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroup;
 import org.apache.kafka.image.MetadataImage;
 import org.apache.kafka.server.common.MetadataVersion;
@@ -182,24 +181,17 @@ public class ClassicGroup implements Group {
      */
     private boolean newMemberAdded = false;
 
-    /**
-     * Coordinator metrics.
-     */
-    private final GroupCoordinatorMetricsShard metrics;
-
     public ClassicGroup(
         LogContext logContext,
         String groupId,
         ClassicGroupState initialState,
-        Time time,
-        GroupCoordinatorMetricsShard metrics
+        Time time
     ) {
         this(
             logContext,
             groupId,
             initialState,
             time,
-            metrics,
             0,
             Optional.empty(),
             Optional.empty(),
@@ -213,7 +205,6 @@ public class ClassicGroup implements Group {
         String groupId,
         ClassicGroupState initialState,
         Time time,
-        GroupCoordinatorMetricsShard metrics,
         int generationId,
         Optional<String> protocolType,
         Optional<String> protocolName,
@@ -226,7 +217,6 @@ public class ClassicGroup implements Group {
         this.state = Objects.requireNonNull(initialState);
         this.previousState = DEAD;
         this.time = Objects.requireNonNull(time);
-        this.metrics = Objects.requireNonNull(metrics);
         this.generationId = generationId;
         this.protocolType = protocolType;
         this.protocolName = protocolName;
@@ -1013,7 +1003,6 @@ public class ClassicGroup implements Group {
         previousState = state;
         state = groupState;
         currentStateTimestamp = Optional.of(time.milliseconds());
-        metrics.onClassicGroupStateTransition(previousState, state);
     }
 
     /**
@@ -1378,7 +1367,6 @@ public class ClassicGroup implements Group {
         String leavingMemberId,
         LogContext logContext,
         Time time,
-        GroupCoordinatorMetricsShard metrics,
         MetadataImage metadataImage
     ) {
         ClassicGroup classicGroup = new ClassicGroup(
@@ -1386,7 +1374,6 @@ public class ClassicGroup implements Group {
             consumerGroup.groupId(),
             ClassicGroupState.STABLE,
             time,
-            metrics,
             consumerGroup.groupEpoch(),
             Optional.ofNullable(ConsumerProtocol.PROTOCOL_TYPE),
             Optional.empty(),
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorMetricsShard.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorMetricsShard.java
index 0892b57436b..219a4f0a22c 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorMetricsShard.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorMetricsShard.java
@@ -29,6 +29,7 @@ import org.apache.kafka.timeline.TimelineLong;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Collections;
 import java.util.Map;
 import java.util.Objects;
 import java.util.concurrent.atomic.AtomicLong;
@@ -65,7 +66,7 @@ public class GroupCoordinatorMetricsShard implements 
CoordinatorMetricsShard {
     /**
      * Classic group size gauge counters keyed by the metric name.
      */
-    private final Map<ClassicGroupState, AtomicLong> classicGroupGauges;
+    private volatile Map<ClassicGroupState, Long> classicGroupGauges;
 
     /**
      * Consumer group size gauge counters keyed by the metric name.
@@ -106,13 +107,7 @@ public class GroupCoordinatorMetricsShard implements 
CoordinatorMetricsShard {
         numOffsetsTimelineGaugeCounter = new TimelineGaugeCounter(new 
TimelineLong(snapshotRegistry), new AtomicLong(0));
         numClassicGroupsTimelineCounter = new TimelineGaugeCounter(new 
TimelineLong(snapshotRegistry), new AtomicLong(0));
 
-        this.classicGroupGauges = Utils.mkMap(
-            Utils.mkEntry(ClassicGroupState.PREPARING_REBALANCE, new 
AtomicLong(0)),
-            Utils.mkEntry(ClassicGroupState.COMPLETING_REBALANCE, new 
AtomicLong(0)),
-            Utils.mkEntry(ClassicGroupState.STABLE, new AtomicLong(0)),
-            Utils.mkEntry(ClassicGroupState.DEAD, new AtomicLong(0)),
-            Utils.mkEntry(ClassicGroupState.EMPTY, new AtomicLong(0))
-        );
+        this.classicGroupGauges = Collections.emptyMap();
 
         this.consumerGroupGauges = Utils.mkMap(
             Utils.mkEntry(ConsumerGroupState.EMPTY,
@@ -140,13 +135,6 @@ public class GroupCoordinatorMetricsShard implements 
CoordinatorMetricsShard {
         this.topicPartition = Objects.requireNonNull(topicPartition);
     }
 
-    public void incrementNumClassicGroups(ClassicGroupState state) {
-        AtomicLong counter = classicGroupGauges.get(state);
-        if (counter != null) {
-            counter.incrementAndGet();
-        }
-    }
-
     /**
      * Increment the number of offsets.
      */
@@ -179,18 +167,6 @@ public class GroupCoordinatorMetricsShard implements 
CoordinatorMetricsShard {
         }
     }
 
-    /**
-     * Decrement the number of classic groups.
-     *
-     * @param state the classic group state.
-     */
-    public void decrementNumClassicGroups(ClassicGroupState state) {
-        AtomicLong counter = classicGroupGauges.get(state);
-        if (counter != null) {
-            counter.decrementAndGet();
-        }
-    }
-
     /**
      * Decrement the number of consumer groups.
      *
@@ -220,9 +196,9 @@ public class GroupCoordinatorMetricsShard implements 
CoordinatorMetricsShard {
      * @return   The number of classic groups in `state`.
      */
     public long numClassicGroups(ClassicGroupState state) {
-        AtomicLong counter = classicGroupGauges.get(state);
+        Long counter = classicGroupGauges.get(state);
         if (counter != null) {
-            return counter.get();
+            return counter;
         }
         return 0L;
     }
@@ -232,7 +208,7 @@ public class GroupCoordinatorMetricsShard implements 
CoordinatorMetricsShard {
      */
     public long numClassicGroups() {
         return classicGroupGauges.values().stream()
-                                 .mapToLong(AtomicLong::get).sum();
+                                 .mapToLong(Long::longValue).sum();
     }
 
     /**
@@ -309,53 +285,14 @@ public class GroupCoordinatorMetricsShard implements 
CoordinatorMetricsShard {
     }
 
     /**
-     * Called when a classic group's state has changed. Increment/decrement
-     * the counter accordingly.
+     * Sets the classicGroupGauges.
      *
-     * @param oldState The previous state. null value means that it's a new 
group.
-     * @param newState The next state. null value means that the group has 
been removed.
+     * @param classicGroupGauges The new classicGroupGauges.
      */
-    public void onClassicGroupStateTransition(
-        ClassicGroupState oldState,
-        ClassicGroupState newState
+    public void setClassicGroupGauges(
+        Map<ClassicGroupState, Long> classicGroupGauges
     ) {
-        if (newState != null) {
-            switch (newState) {
-                case PREPARING_REBALANCE:
-                    
incrementNumClassicGroups(ClassicGroupState.PREPARING_REBALANCE);
-                    break;
-                case COMPLETING_REBALANCE:
-                    
incrementNumClassicGroups(ClassicGroupState.COMPLETING_REBALANCE);
-                    break;
-                case STABLE:
-                    incrementNumClassicGroups(ClassicGroupState.STABLE);
-                    break;
-                case DEAD:
-                    incrementNumClassicGroups(ClassicGroupState.DEAD);
-                    break;
-                case EMPTY:
-                    incrementNumClassicGroups(ClassicGroupState.EMPTY);
-            }
-        }
-
-        if (oldState != null) {
-            switch (oldState) {
-                case PREPARING_REBALANCE:
-                    
decrementNumClassicGroups(ClassicGroupState.PREPARING_REBALANCE);
-                    break;
-                case COMPLETING_REBALANCE:
-                    
decrementNumClassicGroups(ClassicGroupState.COMPLETING_REBALANCE);
-                    break;
-                case STABLE:
-                    decrementNumClassicGroups(ClassicGroupState.STABLE);
-                    break;
-                case DEAD:
-                    decrementNumClassicGroups(ClassicGroupState.DEAD);
-                    break;
-                case EMPTY:
-                    decrementNumClassicGroups(ClassicGroupState.EMPTY);
-            }
-        }
+        this.classicGroupGauges = classicGroupGauges;
     }
 
     /**
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordHelpersTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordHelpersTest.java
index 38e2b9d68cf..e0a9446ce6f 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordHelpersTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordHelpersTest.java
@@ -43,7 +43,6 @@ import 
org.apache.kafka.coordinator.group.generated.GroupMetadataValue;
 import org.apache.kafka.coordinator.group.generated.OffsetCommitKey;
 import org.apache.kafka.coordinator.group.generated.OffsetCommitValue;
 import org.apache.kafka.coordinator.group.generated.ShareGroupMetadataKey;
-import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetricsShard;
 import org.apache.kafka.coordinator.group.modern.MemberState;
 import org.apache.kafka.coordinator.group.modern.TopicMetadata;
 import org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroupMember;
@@ -89,7 +88,6 @@ import static 
org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.n
 import static 
org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.newShareGroupEpochTombstoneRecord;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertThrows;
-import static org.mockito.Mockito.mock;
 
 public class GroupCoordinatorRecordHelpersTest {
 
@@ -514,8 +512,7 @@ public class GroupCoordinatorRecordHelpersTest {
             new LogContext(),
             "group-id",
             ClassicGroupState.PREPARING_REBALANCE,
-            time,
-            mock(GroupCoordinatorMetricsShard.class)
+            time
         );
 
         Map<String, byte[]> assignment = new HashMap<>();
@@ -585,8 +582,7 @@ public class GroupCoordinatorRecordHelpersTest {
             new LogContext(),
             "group-id",
             ClassicGroupState.PREPARING_REBALANCE,
-            time,
-            mock(GroupCoordinatorMetricsShard.class)
+            time
         );
 
         expectedMembers.forEach(member -> {
@@ -637,8 +633,7 @@ public class GroupCoordinatorRecordHelpersTest {
             new LogContext(),
             "group-id",
             ClassicGroupState.PREPARING_REBALANCE,
-            time,
-            mock(GroupCoordinatorMetricsShard.class)
+            time
         );
 
         expectedMembers.forEach(member -> {
@@ -697,8 +692,7 @@ public class GroupCoordinatorRecordHelpersTest {
             new LogContext(),
             "group-id",
             ClassicGroupState.PREPARING_REBALANCE,
-            time,
-            mock(GroupCoordinatorMetricsShard.class)
+            time
         );
 
         group.initNextGeneration();
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java
index 9cf3de6f821..f8bfc5981cc 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java
@@ -75,6 +75,8 @@ import java.util.List;
 
 import static org.apache.kafka.common.utils.Utils.mkSet;
 import static 
org.apache.kafka.coordinator.common.runtime.TestUtil.requestContext;
+import static 
org.apache.kafka.coordinator.group.GroupCoordinatorShard.CLASSIC_GROUP_SIZE_COUNTER_KEY;
+import static 
org.apache.kafka.coordinator.group.GroupCoordinatorShard.DEFAULT_GROUP_GAUGES_UPDATE_INTERVAL_MS;
 import static 
org.apache.kafka.coordinator.group.GroupCoordinatorShard.GROUP_EXPIRATION_KEY;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -1001,6 +1003,47 @@ public class GroupCoordinatorShardTest {
         verify(groupMetadataManager, 
times(0)).maybeDeleteGroup(eq("other-group-id"), any());
     }
 
+    @Test
+    public void testScheduleClassicGroupSizeCounter() {
+        GroupMetadataManager groupMetadataManager = 
mock(GroupMetadataManager.class);
+        OffsetMetadataManager offsetMetadataManager = 
mock(OffsetMetadataManager.class);
+        CoordinatorMetrics coordinatorMetrics = mock(CoordinatorMetrics.class);
+        CoordinatorMetricsShard metricsShard = 
mock(CoordinatorMetricsShard.class);
+        MockTime time = new MockTime();
+        MockCoordinatorTimer<Void, CoordinatorRecord> timer = new 
MockCoordinatorTimer<>(time);
+        GroupCoordinatorConfig config = mock(GroupCoordinatorConfig.class);
+        when(config.offsetsRetentionCheckIntervalMs()).thenReturn(60 * 60 * 
1000L);
+
+        GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
+            new LogContext(),
+            groupMetadataManager,
+            offsetMetadataManager,
+            Time.SYSTEM,
+            timer,
+            config,
+            coordinatorMetrics,
+            metricsShard
+        );
+        coordinator.onLoaded(MetadataImage.EMPTY);
+
+        // The classic group size counter is scheduled.
+        assertEquals(
+            DEFAULT_GROUP_GAUGES_UPDATE_INTERVAL_MS,
+            timer.timeout(CLASSIC_GROUP_SIZE_COUNTER_KEY).deadlineMs - 
time.milliseconds()
+        );
+
+        // Advance the timer to trigger the update.
+        time.sleep(DEFAULT_GROUP_GAUGES_UPDATE_INTERVAL_MS + 1);
+        timer.poll();
+        verify(groupMetadataManager, times(1)).updateClassicGroupSizeCounter();
+
+        // The classic group size counter is scheduled.
+        assertEquals(
+            DEFAULT_GROUP_GAUGES_UPDATE_INTERVAL_MS,
+            timer.timeout(CLASSIC_GROUP_SIZE_COUNTER_KEY).deadlineMs - 
time.milliseconds()
+        );
+    }
+
     @ParameterizedTest
     @EnumSource(value = TransactionResult.class)
     public void testReplayEndTransactionMarker(TransactionResult result) {
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
index 653662b859b..0239d5f6304 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
@@ -75,7 +75,6 @@ import 
org.apache.kafka.coordinator.group.classic.ClassicGroupMember;
 import org.apache.kafka.coordinator.group.classic.ClassicGroupState;
 import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue;
 import org.apache.kafka.coordinator.group.generated.GroupMetadataValue;
-import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetricsShard;
 import org.apache.kafka.coordinator.group.modern.Assignment;
 import org.apache.kafka.coordinator.group.modern.MemberAssignmentImpl;
 import org.apache.kafka.coordinator.group.modern.MemberState;
@@ -152,6 +151,7 @@ import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.api.Assertions.fail;
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
@@ -3576,6 +3576,45 @@ public class GroupMetadataManagerTest {
         
assertNotNull(context.timer.timeout(consumerGroupRebalanceTimeoutKey("foo", 
"foo-1")));
     }
 
+    @Test
+    public void testUpdateClassicGroupSizeCounter() {
+        String groupId0 = "group-0";
+        String groupId1 = "group-1";
+        String groupId2 = "group-2";
+        String groupId3 = "group-3";
+        String groupId4 = "group-4";
+
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .withConsumerGroup(new ConsumerGroupBuilder(groupId0, 10))
+            .build();
+
+        ClassicGroup group1 = 
context.groupMetadataManager.getOrMaybeCreateClassicGroup(groupId1, true);
+        ClassicGroup group2 = 
context.groupMetadataManager.getOrMaybeCreateClassicGroup(groupId2, true);
+        ClassicGroup group3 = 
context.groupMetadataManager.getOrMaybeCreateClassicGroup(groupId3, true);
+        ClassicGroup group4 = 
context.groupMetadataManager.getOrMaybeCreateClassicGroup(groupId4, true);
+
+        context.groupMetadataManager.updateClassicGroupSizeCounter();
+        verify(context.metrics, times(1)).setClassicGroupGauges(eq(Utils.mkMap(
+            Utils.mkEntry(ClassicGroupState.EMPTY, 4L)
+        )));
+
+        group1.transitionTo(PREPARING_REBALANCE);
+        group2.transitionTo(PREPARING_REBALANCE);
+        group2.transitionTo(COMPLETING_REBALANCE);
+        group3.transitionTo(PREPARING_REBALANCE);
+        group3.transitionTo(COMPLETING_REBALANCE);
+        group3.transitionTo(STABLE);
+        group4.transitionTo(DEAD);
+
+        context.groupMetadataManager.updateClassicGroupSizeCounter();
+        verify(context.metrics, times(1)).setClassicGroupGauges(eq(Utils.mkMap(
+            Utils.mkEntry(ClassicGroupState.PREPARING_REBALANCE, 1L),
+            Utils.mkEntry(ClassicGroupState.COMPLETING_REBALANCE, 1L),
+            Utils.mkEntry(ClassicGroupState.STABLE, 1L),
+            Utils.mkEntry(ClassicGroupState.DEAD, 1L)
+        )));
+    }
+
     @Test
     public void testGenerateRecordsOnNewClassicGroup() throws Exception {
         GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
@@ -3678,11 +3717,6 @@ public class GroupMetadataManagerTest {
             "group-id",
             STABLE,
             context.time,
-            new GroupCoordinatorMetricsShard(
-                context.snapshotRegistry,
-                Collections.emptyMap(),
-                new TopicPartition("__consumer_offsets", 0)
-            ),
             1,
             Optional.of("consumer"),
             Optional.of("range"),
@@ -9501,30 +9535,6 @@ public class GroupMetadataManagerTest {
         verify(context.metrics).record(CONSUMER_GROUP_REBALANCES_SENSOR_NAME);
     }
 
-    @Test
-    public void testOnClassicGroupStateTransition() {
-        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
-            .build();
-
-        // Creating a classic group should increment metric.
-        ClassicGroup group = context.createClassicGroup("group-id");
-        verify(context.metrics, times(1)).onClassicGroupStateTransition(null, 
EMPTY);
-
-        // Replaying a new group should not increment metric as the group was 
already created.
-        
context.replay(GroupCoordinatorRecordHelpers.newGroupMetadataRecord(group, 
Collections.emptyMap(), MetadataVersion.LATEST_PRODUCTION));
-        verify(context.metrics, times(1)).onClassicGroupStateTransition(null, 
EMPTY);
-
-        // Loading a tombstone should remove group and decrement metric.
-        context.createClassicGroup("group-id");
-        
context.replay(GroupCoordinatorRecordHelpers.newGroupMetadataTombstoneRecord("group-id"));
-        verify(context.metrics, times(1)).onClassicGroupStateTransition(EMPTY, 
null);
-        assertThrows(GroupIdNotFoundException.class, () -> 
context.groupMetadataManager.group("group-id"));
-
-        // Replaying a tombstone for a group that has already been deleted 
should not decrement metric.
-        
context.replay(GroupCoordinatorRecordHelpers.newGroupMetadataTombstoneRecord("group-id"));
-        verify(context.metrics, times(1)).onClassicGroupStateTransition(EMPTY, 
null);
-    }
-
     @Test
     public void testOnClassicGroupStateTransitionOnLoading() {
         GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
@@ -9534,8 +9544,7 @@ public class GroupMetadataManagerTest {
             new LogContext(),
             "group-id",
             EMPTY,
-            context.time,
-            context.metrics
+            context.time
         );
 
         // Even if there are more group metadata records loaded than tombstone 
records, the last replayed record
@@ -9546,9 +9555,6 @@ public class GroupMetadataManagerTest {
         IntStream.range(0, 4).forEach(__ ->
             
context.replay(GroupCoordinatorRecordHelpers.newGroupMetadataTombstoneRecord("group-id"))
         );
-
-        verify(context.metrics, times(1)).onClassicGroupStateTransition(null, 
EMPTY);
-        verify(context.metrics, times(1)).onClassicGroupStateTransition(EMPTY, 
null);
     }
 
     @Test
@@ -9609,8 +9615,7 @@ public class GroupMetadataManagerTest {
             new LogContext(),
             classicGroupId,
             EMPTY,
-            context.time,
-            context.metrics
+            context.time
         );
         
context.replay(GroupCoordinatorRecordHelpers.newGroupMetadataRecord(classicGroup,
 classicGroup.groupAssignment(), MetadataVersion.latestTesting()));
 
@@ -9638,8 +9643,7 @@ public class GroupMetadataManagerTest {
             new LogContext(),
             classicGroupId,
             EMPTY,
-            context.time,
-            context.metrics
+            context.time
         );
         
context.replay(GroupCoordinatorRecordHelpers.newGroupMetadataRecord(classicGroup,
 classicGroup.groupAssignment(), MetadataVersion.latestTesting()));
 
@@ -10883,7 +10887,6 @@ public class GroupMetadataManagerTest {
             groupId,
             STABLE,
             context.time,
-            context.metrics,
             10,
             Optional.of(ConsumerProtocol.PROTOCOL_TYPE),
             Optional.of("range"),
@@ -10927,7 +10930,6 @@ public class GroupMetadataManagerTest {
         assertRecordsEquals(expectedRecords.subList(7, 10), 
result.records().subList(7, 10));
 
         verify(context.metrics, 
times(1)).onConsumerGroupStateTransition(ConsumerGroup.ConsumerGroupState.STABLE,
 null);
-        verify(context.metrics, times(1)).onClassicGroupStateTransition(null, 
STABLE);
 
         // The new classic member 1 has a heartbeat timeout.
         ScheduledTimeout<Void, CoordinatorRecord> heartbeatTimeout = 
context.timer.timeout(
@@ -10943,14 +10945,6 @@ public class GroupMetadataManagerTest {
         // A new rebalance is triggered.
         ClassicGroup classicGroup = 
context.groupMetadataManager.getOrMaybeCreateClassicGroup(groupId, false);
         assertTrue(classicGroup.isInState(PREPARING_REBALANCE));
-
-        // Simulate a failed write to the log.
-        result.appendFuture().completeExceptionally(new 
NotLeaderOrFollowerException());
-        context.rollback();
-
-        // The group is reverted back to the consumer group.
-        assertEquals(consumerGroup, 
context.groupMetadataManager.consumerGroup(groupId));
-        verify(context.metrics, 
times(1)).onClassicGroupStateTransition(PREPARING_REBALANCE, null);
     }
 
     @Test
@@ -11076,7 +11070,6 @@ public class GroupMetadataManagerTest {
             groupId,
             STABLE,
             context.time,
-            context.metrics,
             10,
             Optional.of(ConsumerProtocol.PROTOCOL_TYPE),
             Optional.of("range"),
@@ -11119,7 +11112,6 @@ public class GroupMetadataManagerTest {
         assertRecordsEquals(expectedRecords.subList(7, 10), 
timeout.result.records().subList(7, 10));
 
         verify(context.metrics, 
times(1)).onConsumerGroupStateTransition(ConsumerGroup.ConsumerGroupState.STABLE,
 null);
-        verify(context.metrics, times(1)).onClassicGroupStateTransition(null, 
STABLE);
 
         // The new classic member 1 has a heartbeat timeout.
         ScheduledTimeout<Void, CoordinatorRecord> heartbeatTimeout = 
context.timer.timeout(
@@ -11288,7 +11280,6 @@ public class GroupMetadataManagerTest {
             groupId,
             STABLE,
             context.time,
-            context.metrics,
             11,
             Optional.of(ConsumerProtocol.PROTOCOL_TYPE),
             Optional.of("range"),
@@ -11332,7 +11323,6 @@ public class GroupMetadataManagerTest {
         assertRecordsEquals(expectedRecords.subList(7, 10), 
timeout.result.records().subList(7, 10));
 
         verify(context.metrics, 
times(1)).onConsumerGroupStateTransition(ConsumerGroup.ConsumerGroupState.RECONCILING,
 null);
-        verify(context.metrics, times(1)).onClassicGroupStateTransition(null, 
STABLE);
 
         // The new classic member 1 has a heartbeat timeout.
         ScheduledTimeout<Void, CoordinatorRecord> heartbeatTimeout = 
context.timer.timeout(
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/classic/ClassicGroupTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/classic/ClassicGroupTest.java
index 1c5dfaa1054..1d496c4b04b 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/classic/ClassicGroupTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/classic/ClassicGroupTest.java
@@ -18,7 +18,6 @@ package org.apache.kafka.coordinator.group.classic;
 
 import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor;
 import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
-import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.CoordinatorNotAvailableException;
 import org.apache.kafka.common.errors.FencedInstanceIdException;
 import org.apache.kafka.common.errors.GroupIdNotFoundException;
@@ -39,8 +38,6 @@ import 
org.apache.kafka.common.utils.annotation.ApiKeyVersionsSource;
 import org.apache.kafka.coordinator.group.OffsetAndMetadata;
 import org.apache.kafka.coordinator.group.OffsetExpirationCondition;
 import org.apache.kafka.coordinator.group.OffsetExpirationConditionImpl;
-import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetricsShard;
-import org.apache.kafka.timeline.SnapshotRegistry;
 
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
@@ -70,10 +67,6 @@ import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
 
 public class ClassicGroupTest {
     private final String protocolType = "consumer";
@@ -84,17 +77,12 @@ public class ClassicGroupTest {
     private final int rebalanceTimeoutMs = 60000;
     private final int sessionTimeoutMs = 10000;
     private final LogContext logContext = new LogContext();
-    private final GroupCoordinatorMetricsShard metrics = new 
GroupCoordinatorMetricsShard(
-        new SnapshotRegistry(logContext),
-        Collections.emptyMap(),
-        new TopicPartition("__consumer_offsets", 0)
-    );
 
     private ClassicGroup group = null;
 
     @BeforeEach
     public void initialize() {
-        group = new ClassicGroup(logContext, "groupId", EMPTY, Time.SYSTEM, 
metrics);
+        group = new ClassicGroup(logContext, "groupId", EMPTY, Time.SYSTEM);
     }
 
     @Test
@@ -1125,7 +1113,7 @@ public class ClassicGroupTest {
         OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(15000L, 
OptionalInt.empty(), "", commitTimestamp, OptionalLong.empty());
         MockTime time = new MockTime();
         long currentStateTimestamp = time.milliseconds();
-        ClassicGroup group = new ClassicGroup(new LogContext(), "groupId", 
EMPTY, time, mock(GroupCoordinatorMetricsShard.class));
+        ClassicGroup group = new ClassicGroup(new LogContext(), "groupId", 
EMPTY, time);
 
         // 1. Test no protocol type. Simple consumer case, Base timestamp 
based off of last commit timestamp.
         Optional<OffsetExpirationCondition> offsetExpirationCondition = 
group.offsetExpirationCondition();
@@ -1200,7 +1188,7 @@ public class ClassicGroupTest {
 
     @Test
     public void testIsSubscribedToTopic() {
-        ClassicGroup group = new ClassicGroup(new LogContext(), "groupId", 
EMPTY, Time.SYSTEM, mock(GroupCoordinatorMetricsShard.class));
+        ClassicGroup group = new ClassicGroup(new LogContext(), "groupId", 
EMPTY, Time.SYSTEM);
 
         // 1. group has no protocol type => not subscribed
         assertFalse(group.isSubscribedToTopic("topic"));
@@ -1262,30 +1250,9 @@ public class ClassicGroupTest {
         assertTrue(group.isSubscribedToTopic("topic"));
     }
 
-    @Test
-    public void testStateTransitionMetrics() {
-        // Confirm metrics is not updated when a new GenericGroup is created 
but only when the group transitions
-        // its state.
-        GroupCoordinatorMetricsShard metrics = 
mock(GroupCoordinatorMetricsShard.class);
-        ClassicGroup group = new ClassicGroup(new LogContext(), "groupId", 
EMPTY, Time.SYSTEM, metrics);
-        verify(metrics, times(0)).onClassicGroupStateTransition(any(), any());
-
-        group.transitionTo(PREPARING_REBALANCE);
-        verify(metrics, times(1)).onClassicGroupStateTransition(EMPTY, 
PREPARING_REBALANCE);
-
-        group.transitionTo(COMPLETING_REBALANCE);
-        verify(metrics, 
times(1)).onClassicGroupStateTransition(PREPARING_REBALANCE, 
COMPLETING_REBALANCE);
-
-        group.transitionTo(STABLE);
-        verify(metrics, 
times(1)).onClassicGroupStateTransition(COMPLETING_REBALANCE, STABLE);
-
-        group.transitionTo(DEAD);
-        verify(metrics, times(1)).onClassicGroupStateTransition(STABLE, DEAD);
-    }
-
     @Test
     public void testIsInStates() {
-        ClassicGroup group = new ClassicGroup(new LogContext(), "groupId", 
EMPTY, Time.SYSTEM, mock(GroupCoordinatorMetricsShard.class));
+        ClassicGroup group = new ClassicGroup(new LogContext(), "groupId", 
EMPTY, Time.SYSTEM);
         assertTrue(group.isInStates(Collections.singleton("empty"), 0));
 
         group.transitionTo(PREPARING_REBALANCE);
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorMetricsShardTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorMetricsShardTest.java
index 950c359294a..09948b05640 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorMetricsShardTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorMetricsShardTest.java
@@ -20,9 +20,6 @@ import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.Uuid;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.utils.LogContext;
-import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.coordinator.group.classic.ClassicGroup;
-import org.apache.kafka.coordinator.group.classic.ClassicGroupState;
 import org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroup;
 import org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroupMember;
 import org.apache.kafka.timeline.SnapshotRegistry;
@@ -34,13 +31,7 @@ import org.junit.jupiter.api.Test;
 import java.util.Collections;
 import java.util.stream.IntStream;
 
-import static 
org.apache.kafka.coordinator.group.classic.ClassicGroupState.COMPLETING_REBALANCE;
-import static 
org.apache.kafka.coordinator.group.classic.ClassicGroupState.DEAD;
-import static 
org.apache.kafka.coordinator.group.classic.ClassicGroupState.EMPTY;
-import static 
org.apache.kafka.coordinator.group.classic.ClassicGroupState.PREPARING_REBALANCE;
-import static 
org.apache.kafka.coordinator.group.classic.ClassicGroupState.STABLE;
 import static 
org.apache.kafka.coordinator.group.metrics.MetricsTestUtils.assertGaugeValue;
-import static 
org.apache.kafka.coordinator.group.metrics.MetricsTestUtils.metricName;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 
 public class GroupCoordinatorMetricsShardTest {
@@ -98,59 +89,6 @@ public class GroupCoordinatorMetricsShardTest {
         assertEquals(0, 
shard.numConsumerGroups(ConsumerGroup.ConsumerGroupState.DEAD));
     }
 
-    @Test
-    public void testGenericGroupStateTransitionMetrics() {
-        MetricsRegistry registry = new MetricsRegistry();
-        Metrics metrics = new Metrics();
-        TopicPartition tp = new TopicPartition("__consumer_offsets", 0);
-        GroupCoordinatorMetrics coordinatorMetrics = new 
GroupCoordinatorMetrics(registry, metrics);
-        GroupCoordinatorMetricsShard shard = 
coordinatorMetrics.newMetricsShard(new SnapshotRegistry(new LogContext()), tp);
-        coordinatorMetrics.activateMetricsShard(shard);
-
-        LogContext logContext = new LogContext();
-        ClassicGroup group0 = new ClassicGroup(logContext, "groupId0", EMPTY, 
Time.SYSTEM, shard);
-        ClassicGroup group1 = new ClassicGroup(logContext, "groupId1", EMPTY, 
Time.SYSTEM, shard);
-        ClassicGroup group2 = new ClassicGroup(logContext, "groupId2", EMPTY, 
Time.SYSTEM, shard);
-        ClassicGroup group3 = new ClassicGroup(logContext, "groupId3", EMPTY, 
Time.SYSTEM, shard);
-
-        IntStream.range(0, 4).forEach(__ -> 
shard.incrementNumClassicGroups(EMPTY));
-
-        assertEquals(4, shard.numClassicGroups());
-
-        group0.transitionTo(PREPARING_REBALANCE);
-        group0.transitionTo(COMPLETING_REBALANCE);
-        group1.transitionTo(PREPARING_REBALANCE);
-        group2.transitionTo(DEAD);
-
-        assertEquals(1, shard.numClassicGroups(ClassicGroupState.EMPTY));
-        assertEquals(1, 
shard.numClassicGroups(ClassicGroupState.PREPARING_REBALANCE));
-        assertEquals(1, 
shard.numClassicGroups(ClassicGroupState.COMPLETING_REBALANCE));
-        assertEquals(1, shard.numClassicGroups(ClassicGroupState.DEAD));
-        assertEquals(0, shard.numClassicGroups(ClassicGroupState.STABLE));
-
-        group0.transitionTo(STABLE);
-        group1.transitionTo(COMPLETING_REBALANCE);
-        group3.transitionTo(DEAD);
-
-        assertEquals(0, shard.numClassicGroups(ClassicGroupState.EMPTY));
-        assertEquals(0, 
shard.numClassicGroups(ClassicGroupState.PREPARING_REBALANCE));
-        assertEquals(1, 
shard.numClassicGroups(ClassicGroupState.COMPLETING_REBALANCE));
-        assertEquals(2, shard.numClassicGroups(ClassicGroupState.DEAD));
-        assertEquals(1, shard.numClassicGroups(ClassicGroupState.STABLE));
-
-        assertGaugeValue(
-            metrics,
-            metrics.metricName("group-count", "group-coordinator-metrics", 
Collections.singletonMap("protocol", "classic")),
-            4
-        );
-        assertGaugeValue(registry, metricName("GroupMetadataManager", 
"NumGroups"), 4);
-        assertGaugeValue(registry, metricName("GroupMetadataManager", 
"NumGroupsEmpty"), 0);
-        assertGaugeValue(registry, metricName("GroupMetadataManager", 
"NumGroupsPreparingRebalance"), 0);
-        assertGaugeValue(registry, metricName("GroupMetadataManager", 
"NumGroupsCompletingRebalance"), 1);
-        assertGaugeValue(registry, metricName("GroupMetadataManager", 
"NumGroupsDead"), 2);
-        assertGaugeValue(registry, metricName("GroupMetadataManager", 
"NumGroupsStable"), 1);
-    }
-
     @Test
     public void testConsumerGroupStateTransitionMetrics() {
         MetricsRegistry registry = new MetricsRegistry();
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorMetricsTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorMetricsTest.java
index cc0c759c440..0a4f491e819 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorMetricsTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorMetricsTest.java
@@ -23,6 +23,7 @@ import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.coordinator.group.Group;
 import org.apache.kafka.coordinator.group.classic.ClassicGroupState;
 import 
org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroup.ConsumerGroupState;
@@ -165,11 +166,19 @@ public class GroupCoordinatorMetricsTest {
         coordinatorMetrics.activateMetricsShard(shard0);
         coordinatorMetrics.activateMetricsShard(shard1);
 
-        IntStream.range(0, 5).forEach(__ -> 
shard0.incrementNumClassicGroups(ClassicGroupState.PREPARING_REBALANCE));
-        IntStream.range(0, 1).forEach(__ -> 
shard0.decrementNumClassicGroups(ClassicGroupState.COMPLETING_REBALANCE));
-        IntStream.range(0, 5).forEach(__ -> 
shard1.incrementNumClassicGroups(ClassicGroupState.STABLE));
-        IntStream.range(0, 4).forEach(__ -> 
shard1.incrementNumClassicGroups(ClassicGroupState.DEAD));
-        IntStream.range(0, 4).forEach(__ -> 
shard1.decrementNumClassicGroups(ClassicGroupState.EMPTY));
+        shard0.setClassicGroupGauges(Utils.mkMap(
+            Utils.mkEntry(ClassicGroupState.PREPARING_REBALANCE, 1L),
+            Utils.mkEntry(ClassicGroupState.COMPLETING_REBALANCE, 1L),
+            Utils.mkEntry(ClassicGroupState.STABLE, 1L),
+            Utils.mkEntry(ClassicGroupState.EMPTY, 1L)
+        ));
+        shard1.setClassicGroupGauges(Utils.mkMap(
+            Utils.mkEntry(ClassicGroupState.PREPARING_REBALANCE, 1L),
+            Utils.mkEntry(ClassicGroupState.COMPLETING_REBALANCE, 1L),
+            Utils.mkEntry(ClassicGroupState.STABLE, 1L),
+            Utils.mkEntry(ClassicGroupState.EMPTY, 1L),
+            Utils.mkEntry(ClassicGroupState.DEAD, 1L)
+        ));
 
         IntStream.range(0, 5).forEach(__ -> 
shard0.incrementNumConsumerGroups(ConsumerGroupState.ASSIGNING));
         IntStream.range(0, 5).forEach(__ -> 
shard1.incrementNumConsumerGroups(ConsumerGroupState.RECONCILING));

Reply via email to