This is an automated email from the ASF dual-hosted git repository.
dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new cbc02e006d0 KAFKA-16106; Schedule timeout task to refresh classic
group size metric (#17325)
cbc02e006d0 is described below
commit cbc02e006d0f1805ff14c843b9c80ce9d601c23b
Author: Dongnuo Lyu <[email protected]>
AuthorDate: Fri Oct 4 03:31:06 2024 -0400
KAFKA-16106; Schedule timeout task to refresh classic group size metric
(#17325)
In the existing implementation, If an operation modifying the classic group
state fails, the group reverts but the group size counter does not. This
creates an inconsistency between the group size metric and the actual group
size.
Considering that It will be complicated to rely on the appendFuture to
revert the metrics upon the operation failure, this PR introduces a new
implementation. A timeout task will periodically refresh the metrics based on
the current groups soft state. The refreshing interval is hardcoded to 60
seconds.
Reviewers: David Jacot <[email protected]>
---
.../coordinator/group/GroupCoordinatorShard.java | 38 +++++++++
.../coordinator/group/GroupMetadataManager.java | 39 +++++----
.../org/apache/kafka/coordinator/group/Utils.java | 16 ++++
.../coordinator/group/classic/ClassicGroup.java | 15 +---
.../metrics/GroupCoordinatorMetricsShard.java | 85 +++----------------
.../group/GroupCoordinatorRecordHelpersTest.java | 14 +---
.../group/GroupCoordinatorShardTest.java | 43 ++++++++++
.../group/GroupMetadataManagerTest.java | 96 ++++++++++------------
.../group/classic/ClassicGroupTest.java | 41 +--------
.../metrics/GroupCoordinatorMetricsShardTest.java | 62 --------------
.../group/metrics/GroupCoordinatorMetricsTest.java | 19 +++--
11 files changed, 195 insertions(+), 273 deletions(-)
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
index c6eced2158e..70b8d88fa04 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
@@ -248,6 +248,18 @@ public class GroupCoordinatorShard implements
CoordinatorShard<CoordinatorRecord
*/
static final String GROUP_EXPIRATION_KEY = "expire-group-metadata";
+ /**
+ * The classic group size counter key to schedule a timer task.
+ *
+ * Visible for testing.
+ */
+ static final String CLASSIC_GROUP_SIZE_COUNTER_KEY =
"classic-group-size-counter";
+
+ /**
+ * Hardcoded default value of the interval to update the classic group
size counter.
+ */
+ static final int DEFAULT_GROUP_GAUGES_UPDATE_INTERVAL_MS = 60 * 1000;
+
/**
* The logger.
*/
@@ -677,6 +689,30 @@ public class GroupCoordinatorShard implements
CoordinatorShard<CoordinatorRecord
return new CoordinatorResult<>(records, false);
}
+ /**
+ * Schedules (or reschedules) the group size counter for the classic
groups.
+ */
+ private void scheduleClassicGroupSizeCounter() {
+ timer.schedule(
+ CLASSIC_GROUP_SIZE_COUNTER_KEY,
+ DEFAULT_GROUP_GAUGES_UPDATE_INTERVAL_MS,
+ TimeUnit.MILLISECONDS,
+ true,
+ () -> {
+ groupMetadataManager.updateClassicGroupSizeCounter();
+ scheduleClassicGroupSizeCounter();
+ return GroupMetadataManager.EMPTY_RESULT;
+ }
+ );
+ }
+
+ /**
+ * Cancels the group size counter for the classic groups.
+ */
+ private void cancelClassicGroupSizeCounter() {
+ timer.cancel(CLASSIC_GROUP_SIZE_COUNTER_KEY);
+ }
+
/**
* The coordinator has been loaded. This is used to apply any
* post loading operations (e.g. registering timers).
@@ -692,6 +728,7 @@ public class GroupCoordinatorShard implements
CoordinatorShard<CoordinatorRecord
groupMetadataManager.onLoaded();
scheduleGroupMetadataExpiration();
+ scheduleClassicGroupSizeCounter();
}
@Override
@@ -699,6 +736,7 @@ public class GroupCoordinatorShard implements
CoordinatorShard<CoordinatorRecord
timer.cancel(GROUP_EXPIRATION_KEY);
coordinatorMetrics.deactivateMetricsShard(metricsShard);
groupMetadataManager.onUnloaded();
+ cancelClassicGroupSizeCounter();
}
/**
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
index 384c5295c97..39ac3c2e2f8 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
@@ -868,7 +868,6 @@ public class GroupMetadataManager {
ConsumerGroup consumerGroup = new ConsumerGroup(snapshotRegistry,
groupId, metrics);
groups.put(groupId, consumerGroup);
metrics.onConsumerGroupStateTransition(null,
consumerGroup.state());
- metrics.onClassicGroupStateTransition(EMPTY, null);
return consumerGroup;
} else {
throw new IllegalStateException(String.format("Group %s is not a
consumer group", groupId));
@@ -899,9 +898,8 @@ public class GroupMetadataManager {
}
if (group == null) {
- ClassicGroup classicGroup = new ClassicGroup(logContext, groupId,
ClassicGroupState.EMPTY, time, metrics);
+ ClassicGroup classicGroup = new ClassicGroup(logContext, groupId,
ClassicGroupState.EMPTY, time);
groups.put(groupId, classicGroup);
- metrics.onClassicGroupStateTransition(null,
classicGroup.currentState());
return classicGroup;
} else {
if (group.type() == CLASSIC) {
@@ -1091,7 +1089,6 @@ public class GroupMetadataManager {
leavingMemberId,
logContext,
time,
- metrics,
metadataImage
);
} catch (SchemaException e) {
@@ -1108,17 +1105,11 @@ public class GroupMetadataManager {
// Set the appendFuture to prevent the records from being replayed.
removeGroup(consumerGroup.groupId());
groups.put(consumerGroup.groupId(), classicGroup);
- metrics.onClassicGroupStateTransition(null,
classicGroup.currentState());
classicGroup.allMembers().forEach(member ->
rescheduleClassicGroupMemberHeartbeat(classicGroup, member));
prepareRebalance(classicGroup, String.format("Downgrade group %s from
consumer to classic.", classicGroup.groupId()));
- CompletableFuture<Void> appendFuture = new CompletableFuture<>();
- appendFuture.exceptionally(__ -> {
- metrics.onClassicGroupStateTransition(classicGroup.currentState(),
null);
- return null;
- });
- return new CoordinatorResult<>(records, response, appendFuture, false);
+ return new CoordinatorResult<>(records, response, null, false);
}
/**
@@ -1204,8 +1195,7 @@ public class GroupMetadataManager {
metrics.onConsumerGroupStateTransition(consumerGroup.state(), null);
break;
case CLASSIC:
- ClassicGroup classicGroup = (ClassicGroup) group;
-
metrics.onClassicGroupStateTransition(classicGroup.currentState(), null);
+ // The classic group size counter is implemented as
scheduled task.
break;
case SHARE:
// Nothing for now, but we may want to add metrics in the
future.
@@ -3742,11 +3732,25 @@ public class GroupMetadataManager {
});
}
+ /**
+ * Counts and updates the number of classic groups in different states.
+ */
+ public void updateClassicGroupSizeCounter() {
+ Map<ClassicGroupState, Long> groupSizeCounter = new HashMap<>();
+ groups.forEach((__, group) -> {
+ if (group.type() == CLASSIC) {
+ groupSizeCounter.compute(((ClassicGroup)
group).currentState(), Utils::incValue);
+ }
+ });
+ metrics.setClassicGroupGauges(groupSizeCounter);
+ }
+
/**
* The coordinator has been loaded. Session timeouts are registered
* for all members.
*/
public void onLoaded() {
+ Map<ClassicGroupState, Long> classicGroupSizeCounter = new HashMap<>();
groups.forEach((groupId, group) -> {
switch (group.type()) {
case CONSUMER:
@@ -3780,6 +3784,8 @@ public class GroupMetadataManager {
" (size " + classicGroup.numMembers() + ") is over
capacity " + classicGroupMaxSize +
". Rebalancing in order to give a chance for
consumers to commit offsets");
}
+
+
classicGroupSizeCounter.compute(classicGroup.currentState(), Utils::incValue);
break;
case SHARE:
@@ -3791,6 +3797,7 @@ public class GroupMetadataManager {
break;
}
});
+ metrics.setClassicGroupGauges(classicGroupSizeCounter);
}
/**
@@ -3901,7 +3908,6 @@ public class GroupMetadataManager {
groupId,
loadedMembers.isEmpty() ? EMPTY : STABLE,
time,
- metrics,
value.generation(),
protocolType == null || protocolType.isEmpty() ?
Optional.empty() : Optional.of(protocolType),
Optional.ofNullable(value.protocol()),
@@ -3910,10 +3916,7 @@ public class GroupMetadataManager {
);
loadedMembers.forEach(member -> classicGroup.add(member, null));
- Group prevGroup = groups.put(groupId, classicGroup);
- if (prevGroup == null) {
- metrics.onClassicGroupStateTransition(null,
classicGroup.currentState());
- }
+ groups.put(groupId, classicGroup);
classicGroup.setSubscribedTopics(
classicGroup.computeSubscribedTopics()
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Utils.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Utils.java
index 91b498f62cc..5e32d17ae50 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Utils.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Utils.java
@@ -102,6 +102,22 @@ public class Utils {
return value == null ? 1 : value + 1;
}
+ /**
+ * Decrements value by 1; returns null when reaching zero. This helper is
+ * meant to be used with Map#compute.
+ */
+ public static Long decValue(Object key, Long value) {
+ if (value == null) return null;
+ return value == 1 ? null : value - 1;
+ }
+
+ /**
+ * Increments value by 1; This helper is meant to be used with Map#compute.
+ */
+ public static Long incValue(Object key, Long value) {
+ return value == null ? 1 : value + 1;
+ }
+
/**
* @return An Optional containing the provided string if it is not null
and not empty,
* otherwise an empty Optional.
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/classic/ClassicGroup.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/classic/ClassicGroup.java
index 907380289e6..265e5ea4453 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/classic/ClassicGroup.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/classic/ClassicGroup.java
@@ -38,7 +38,6 @@ import org.apache.kafka.coordinator.group.Group;
import org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers;
import org.apache.kafka.coordinator.group.OffsetExpirationCondition;
import org.apache.kafka.coordinator.group.OffsetExpirationConditionImpl;
-import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetricsShard;
import org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroup;
import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.server.common.MetadataVersion;
@@ -182,24 +181,17 @@ public class ClassicGroup implements Group {
*/
private boolean newMemberAdded = false;
- /**
- * Coordinator metrics.
- */
- private final GroupCoordinatorMetricsShard metrics;
-
public ClassicGroup(
LogContext logContext,
String groupId,
ClassicGroupState initialState,
- Time time,
- GroupCoordinatorMetricsShard metrics
+ Time time
) {
this(
logContext,
groupId,
initialState,
time,
- metrics,
0,
Optional.empty(),
Optional.empty(),
@@ -213,7 +205,6 @@ public class ClassicGroup implements Group {
String groupId,
ClassicGroupState initialState,
Time time,
- GroupCoordinatorMetricsShard metrics,
int generationId,
Optional<String> protocolType,
Optional<String> protocolName,
@@ -226,7 +217,6 @@ public class ClassicGroup implements Group {
this.state = Objects.requireNonNull(initialState);
this.previousState = DEAD;
this.time = Objects.requireNonNull(time);
- this.metrics = Objects.requireNonNull(metrics);
this.generationId = generationId;
this.protocolType = protocolType;
this.protocolName = protocolName;
@@ -1013,7 +1003,6 @@ public class ClassicGroup implements Group {
previousState = state;
state = groupState;
currentStateTimestamp = Optional.of(time.milliseconds());
- metrics.onClassicGroupStateTransition(previousState, state);
}
/**
@@ -1378,7 +1367,6 @@ public class ClassicGroup implements Group {
String leavingMemberId,
LogContext logContext,
Time time,
- GroupCoordinatorMetricsShard metrics,
MetadataImage metadataImage
) {
ClassicGroup classicGroup = new ClassicGroup(
@@ -1386,7 +1374,6 @@ public class ClassicGroup implements Group {
consumerGroup.groupId(),
ClassicGroupState.STABLE,
time,
- metrics,
consumerGroup.groupEpoch(),
Optional.ofNullable(ConsumerProtocol.PROTOCOL_TYPE),
Optional.empty(),
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorMetricsShard.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorMetricsShard.java
index 0892b57436b..219a4f0a22c 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorMetricsShard.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorMetricsShard.java
@@ -29,6 +29,7 @@ import org.apache.kafka.timeline.TimelineLong;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.Collections;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicLong;
@@ -65,7 +66,7 @@ public class GroupCoordinatorMetricsShard implements
CoordinatorMetricsShard {
/**
* Classic group size gauge counters keyed by the metric name.
*/
- private final Map<ClassicGroupState, AtomicLong> classicGroupGauges;
+ private volatile Map<ClassicGroupState, Long> classicGroupGauges;
/**
* Consumer group size gauge counters keyed by the metric name.
@@ -106,13 +107,7 @@ public class GroupCoordinatorMetricsShard implements
CoordinatorMetricsShard {
numOffsetsTimelineGaugeCounter = new TimelineGaugeCounter(new
TimelineLong(snapshotRegistry), new AtomicLong(0));
numClassicGroupsTimelineCounter = new TimelineGaugeCounter(new
TimelineLong(snapshotRegistry), new AtomicLong(0));
- this.classicGroupGauges = Utils.mkMap(
- Utils.mkEntry(ClassicGroupState.PREPARING_REBALANCE, new
AtomicLong(0)),
- Utils.mkEntry(ClassicGroupState.COMPLETING_REBALANCE, new
AtomicLong(0)),
- Utils.mkEntry(ClassicGroupState.STABLE, new AtomicLong(0)),
- Utils.mkEntry(ClassicGroupState.DEAD, new AtomicLong(0)),
- Utils.mkEntry(ClassicGroupState.EMPTY, new AtomicLong(0))
- );
+ this.classicGroupGauges = Collections.emptyMap();
this.consumerGroupGauges = Utils.mkMap(
Utils.mkEntry(ConsumerGroupState.EMPTY,
@@ -140,13 +135,6 @@ public class GroupCoordinatorMetricsShard implements
CoordinatorMetricsShard {
this.topicPartition = Objects.requireNonNull(topicPartition);
}
- public void incrementNumClassicGroups(ClassicGroupState state) {
- AtomicLong counter = classicGroupGauges.get(state);
- if (counter != null) {
- counter.incrementAndGet();
- }
- }
-
/**
* Increment the number of offsets.
*/
@@ -179,18 +167,6 @@ public class GroupCoordinatorMetricsShard implements
CoordinatorMetricsShard {
}
}
- /**
- * Decrement the number of classic groups.
- *
- * @param state the classic group state.
- */
- public void decrementNumClassicGroups(ClassicGroupState state) {
- AtomicLong counter = classicGroupGauges.get(state);
- if (counter != null) {
- counter.decrementAndGet();
- }
- }
-
/**
* Decrement the number of consumer groups.
*
@@ -220,9 +196,9 @@ public class GroupCoordinatorMetricsShard implements
CoordinatorMetricsShard {
* @return The number of classic groups in `state`.
*/
public long numClassicGroups(ClassicGroupState state) {
- AtomicLong counter = classicGroupGauges.get(state);
+ Long counter = classicGroupGauges.get(state);
if (counter != null) {
- return counter.get();
+ return counter;
}
return 0L;
}
@@ -232,7 +208,7 @@ public class GroupCoordinatorMetricsShard implements
CoordinatorMetricsShard {
*/
public long numClassicGroups() {
return classicGroupGauges.values().stream()
- .mapToLong(AtomicLong::get).sum();
+ .mapToLong(Long::longValue).sum();
}
/**
@@ -309,53 +285,14 @@ public class GroupCoordinatorMetricsShard implements
CoordinatorMetricsShard {
}
/**
- * Called when a classic group's state has changed. Increment/decrement
- * the counter accordingly.
+ * Sets the classicGroupGauges.
*
- * @param oldState The previous state. null value means that it's a new
group.
- * @param newState The next state. null value means that the group has
been removed.
+ * @param classicGroupGauges The new classicGroupGauges.
*/
- public void onClassicGroupStateTransition(
- ClassicGroupState oldState,
- ClassicGroupState newState
+ public void setClassicGroupGauges(
+ Map<ClassicGroupState, Long> classicGroupGauges
) {
- if (newState != null) {
- switch (newState) {
- case PREPARING_REBALANCE:
-
incrementNumClassicGroups(ClassicGroupState.PREPARING_REBALANCE);
- break;
- case COMPLETING_REBALANCE:
-
incrementNumClassicGroups(ClassicGroupState.COMPLETING_REBALANCE);
- break;
- case STABLE:
- incrementNumClassicGroups(ClassicGroupState.STABLE);
- break;
- case DEAD:
- incrementNumClassicGroups(ClassicGroupState.DEAD);
- break;
- case EMPTY:
- incrementNumClassicGroups(ClassicGroupState.EMPTY);
- }
- }
-
- if (oldState != null) {
- switch (oldState) {
- case PREPARING_REBALANCE:
-
decrementNumClassicGroups(ClassicGroupState.PREPARING_REBALANCE);
- break;
- case COMPLETING_REBALANCE:
-
decrementNumClassicGroups(ClassicGroupState.COMPLETING_REBALANCE);
- break;
- case STABLE:
- decrementNumClassicGroups(ClassicGroupState.STABLE);
- break;
- case DEAD:
- decrementNumClassicGroups(ClassicGroupState.DEAD);
- break;
- case EMPTY:
- decrementNumClassicGroups(ClassicGroupState.EMPTY);
- }
- }
+ this.classicGroupGauges = classicGroupGauges;
}
/**
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordHelpersTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordHelpersTest.java
index 38e2b9d68cf..e0a9446ce6f 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordHelpersTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordHelpersTest.java
@@ -43,7 +43,6 @@ import
org.apache.kafka.coordinator.group.generated.GroupMetadataValue;
import org.apache.kafka.coordinator.group.generated.OffsetCommitKey;
import org.apache.kafka.coordinator.group.generated.OffsetCommitValue;
import org.apache.kafka.coordinator.group.generated.ShareGroupMetadataKey;
-import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetricsShard;
import org.apache.kafka.coordinator.group.modern.MemberState;
import org.apache.kafka.coordinator.group.modern.TopicMetadata;
import org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroupMember;
@@ -89,7 +88,6 @@ import static
org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.n
import static
org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.newShareGroupEpochTombstoneRecord;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
-import static org.mockito.Mockito.mock;
public class GroupCoordinatorRecordHelpersTest {
@@ -514,8 +512,7 @@ public class GroupCoordinatorRecordHelpersTest {
new LogContext(),
"group-id",
ClassicGroupState.PREPARING_REBALANCE,
- time,
- mock(GroupCoordinatorMetricsShard.class)
+ time
);
Map<String, byte[]> assignment = new HashMap<>();
@@ -585,8 +582,7 @@ public class GroupCoordinatorRecordHelpersTest {
new LogContext(),
"group-id",
ClassicGroupState.PREPARING_REBALANCE,
- time,
- mock(GroupCoordinatorMetricsShard.class)
+ time
);
expectedMembers.forEach(member -> {
@@ -637,8 +633,7 @@ public class GroupCoordinatorRecordHelpersTest {
new LogContext(),
"group-id",
ClassicGroupState.PREPARING_REBALANCE,
- time,
- mock(GroupCoordinatorMetricsShard.class)
+ time
);
expectedMembers.forEach(member -> {
@@ -697,8 +692,7 @@ public class GroupCoordinatorRecordHelpersTest {
new LogContext(),
"group-id",
ClassicGroupState.PREPARING_REBALANCE,
- time,
- mock(GroupCoordinatorMetricsShard.class)
+ time
);
group.initNextGeneration();
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java
index 9cf3de6f821..f8bfc5981cc 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java
@@ -75,6 +75,8 @@ import java.util.List;
import static org.apache.kafka.common.utils.Utils.mkSet;
import static
org.apache.kafka.coordinator.common.runtime.TestUtil.requestContext;
+import static
org.apache.kafka.coordinator.group.GroupCoordinatorShard.CLASSIC_GROUP_SIZE_COUNTER_KEY;
+import static
org.apache.kafka.coordinator.group.GroupCoordinatorShard.DEFAULT_GROUP_GAUGES_UPDATE_INTERVAL_MS;
import static
org.apache.kafka.coordinator.group.GroupCoordinatorShard.GROUP_EXPIRATION_KEY;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -1001,6 +1003,47 @@ public class GroupCoordinatorShardTest {
verify(groupMetadataManager,
times(0)).maybeDeleteGroup(eq("other-group-id"), any());
}
+ @Test
+ public void testScheduleClassicGroupSizeCounter() {
+ GroupMetadataManager groupMetadataManager =
mock(GroupMetadataManager.class);
+ OffsetMetadataManager offsetMetadataManager =
mock(OffsetMetadataManager.class);
+ CoordinatorMetrics coordinatorMetrics = mock(CoordinatorMetrics.class);
+ CoordinatorMetricsShard metricsShard =
mock(CoordinatorMetricsShard.class);
+ MockTime time = new MockTime();
+ MockCoordinatorTimer<Void, CoordinatorRecord> timer = new
MockCoordinatorTimer<>(time);
+ GroupCoordinatorConfig config = mock(GroupCoordinatorConfig.class);
+ when(config.offsetsRetentionCheckIntervalMs()).thenReturn(60 * 60 *
1000L);
+
+ GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
+ new LogContext(),
+ groupMetadataManager,
+ offsetMetadataManager,
+ Time.SYSTEM,
+ timer,
+ config,
+ coordinatorMetrics,
+ metricsShard
+ );
+ coordinator.onLoaded(MetadataImage.EMPTY);
+
+ // The classic group size counter is scheduled.
+ assertEquals(
+ DEFAULT_GROUP_GAUGES_UPDATE_INTERVAL_MS,
+ timer.timeout(CLASSIC_GROUP_SIZE_COUNTER_KEY).deadlineMs -
time.milliseconds()
+ );
+
+ // Advance the timer to trigger the update.
+ time.sleep(DEFAULT_GROUP_GAUGES_UPDATE_INTERVAL_MS + 1);
+ timer.poll();
+ verify(groupMetadataManager, times(1)).updateClassicGroupSizeCounter();
+
+ // The classic group size counter is scheduled.
+ assertEquals(
+ DEFAULT_GROUP_GAUGES_UPDATE_INTERVAL_MS,
+ timer.timeout(CLASSIC_GROUP_SIZE_COUNTER_KEY).deadlineMs -
time.milliseconds()
+ );
+ }
+
@ParameterizedTest
@EnumSource(value = TransactionResult.class)
public void testReplayEndTransactionMarker(TransactionResult result) {
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
index 653662b859b..0239d5f6304 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
@@ -75,7 +75,6 @@ import
org.apache.kafka.coordinator.group.classic.ClassicGroupMember;
import org.apache.kafka.coordinator.group.classic.ClassicGroupState;
import
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue;
import org.apache.kafka.coordinator.group.generated.GroupMetadataValue;
-import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetricsShard;
import org.apache.kafka.coordinator.group.modern.Assignment;
import org.apache.kafka.coordinator.group.modern.MemberAssignmentImpl;
import org.apache.kafka.coordinator.group.modern.MemberState;
@@ -152,6 +151,7 @@ import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
@@ -3576,6 +3576,45 @@ public class GroupMetadataManagerTest {
assertNotNull(context.timer.timeout(consumerGroupRebalanceTimeoutKey("foo",
"foo-1")));
}
+ @Test
+ public void testUpdateClassicGroupSizeCounter() {
+ String groupId0 = "group-0";
+ String groupId1 = "group-1";
+ String groupId2 = "group-2";
+ String groupId3 = "group-3";
+ String groupId4 = "group-4";
+
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+ .withConsumerGroup(new ConsumerGroupBuilder(groupId0, 10))
+ .build();
+
+ ClassicGroup group1 =
context.groupMetadataManager.getOrMaybeCreateClassicGroup(groupId1, true);
+ ClassicGroup group2 =
context.groupMetadataManager.getOrMaybeCreateClassicGroup(groupId2, true);
+ ClassicGroup group3 =
context.groupMetadataManager.getOrMaybeCreateClassicGroup(groupId3, true);
+ ClassicGroup group4 =
context.groupMetadataManager.getOrMaybeCreateClassicGroup(groupId4, true);
+
+ context.groupMetadataManager.updateClassicGroupSizeCounter();
+ verify(context.metrics, times(1)).setClassicGroupGauges(eq(Utils.mkMap(
+ Utils.mkEntry(ClassicGroupState.EMPTY, 4L)
+ )));
+
+ group1.transitionTo(PREPARING_REBALANCE);
+ group2.transitionTo(PREPARING_REBALANCE);
+ group2.transitionTo(COMPLETING_REBALANCE);
+ group3.transitionTo(PREPARING_REBALANCE);
+ group3.transitionTo(COMPLETING_REBALANCE);
+ group3.transitionTo(STABLE);
+ group4.transitionTo(DEAD);
+
+ context.groupMetadataManager.updateClassicGroupSizeCounter();
+ verify(context.metrics, times(1)).setClassicGroupGauges(eq(Utils.mkMap(
+ Utils.mkEntry(ClassicGroupState.PREPARING_REBALANCE, 1L),
+ Utils.mkEntry(ClassicGroupState.COMPLETING_REBALANCE, 1L),
+ Utils.mkEntry(ClassicGroupState.STABLE, 1L),
+ Utils.mkEntry(ClassicGroupState.DEAD, 1L)
+ )));
+ }
+
@Test
public void testGenerateRecordsOnNewClassicGroup() throws Exception {
GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
@@ -3678,11 +3717,6 @@ public class GroupMetadataManagerTest {
"group-id",
STABLE,
context.time,
- new GroupCoordinatorMetricsShard(
- context.snapshotRegistry,
- Collections.emptyMap(),
- new TopicPartition("__consumer_offsets", 0)
- ),
1,
Optional.of("consumer"),
Optional.of("range"),
@@ -9501,30 +9535,6 @@ public class GroupMetadataManagerTest {
verify(context.metrics).record(CONSUMER_GROUP_REBALANCES_SENSOR_NAME);
}
- @Test
- public void testOnClassicGroupStateTransition() {
- GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
- .build();
-
- // Creating a classic group should increment metric.
- ClassicGroup group = context.createClassicGroup("group-id");
- verify(context.metrics, times(1)).onClassicGroupStateTransition(null,
EMPTY);
-
- // Replaying a new group should not increment metric as the group was
already created.
-
context.replay(GroupCoordinatorRecordHelpers.newGroupMetadataRecord(group,
Collections.emptyMap(), MetadataVersion.LATEST_PRODUCTION));
- verify(context.metrics, times(1)).onClassicGroupStateTransition(null,
EMPTY);
-
- // Loading a tombstone should remove group and decrement metric.
- context.createClassicGroup("group-id");
-
context.replay(GroupCoordinatorRecordHelpers.newGroupMetadataTombstoneRecord("group-id"));
- verify(context.metrics, times(1)).onClassicGroupStateTransition(EMPTY,
null);
- assertThrows(GroupIdNotFoundException.class, () ->
context.groupMetadataManager.group("group-id"));
-
- // Replaying a tombstone for a group that has already been deleted
should not decrement metric.
-
context.replay(GroupCoordinatorRecordHelpers.newGroupMetadataTombstoneRecord("group-id"));
- verify(context.metrics, times(1)).onClassicGroupStateTransition(EMPTY,
null);
- }
-
@Test
public void testOnClassicGroupStateTransitionOnLoading() {
GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
@@ -9534,8 +9544,7 @@ public class GroupMetadataManagerTest {
new LogContext(),
"group-id",
EMPTY,
- context.time,
- context.metrics
+ context.time
);
// Even if there are more group metadata records loaded than tombstone
records, the last replayed record
@@ -9546,9 +9555,6 @@ public class GroupMetadataManagerTest {
IntStream.range(0, 4).forEach(__ ->
context.replay(GroupCoordinatorRecordHelpers.newGroupMetadataTombstoneRecord("group-id"))
);
-
- verify(context.metrics, times(1)).onClassicGroupStateTransition(null,
EMPTY);
- verify(context.metrics, times(1)).onClassicGroupStateTransition(EMPTY,
null);
}
@Test
@@ -9609,8 +9615,7 @@ public class GroupMetadataManagerTest {
new LogContext(),
classicGroupId,
EMPTY,
- context.time,
- context.metrics
+ context.time
);
context.replay(GroupCoordinatorRecordHelpers.newGroupMetadataRecord(classicGroup,
classicGroup.groupAssignment(), MetadataVersion.latestTesting()));
@@ -9638,8 +9643,7 @@ public class GroupMetadataManagerTest {
new LogContext(),
classicGroupId,
EMPTY,
- context.time,
- context.metrics
+ context.time
);
context.replay(GroupCoordinatorRecordHelpers.newGroupMetadataRecord(classicGroup,
classicGroup.groupAssignment(), MetadataVersion.latestTesting()));
@@ -10883,7 +10887,6 @@ public class GroupMetadataManagerTest {
groupId,
STABLE,
context.time,
- context.metrics,
10,
Optional.of(ConsumerProtocol.PROTOCOL_TYPE),
Optional.of("range"),
@@ -10927,7 +10930,6 @@ public class GroupMetadataManagerTest {
assertRecordsEquals(expectedRecords.subList(7, 10),
result.records().subList(7, 10));
verify(context.metrics,
times(1)).onConsumerGroupStateTransition(ConsumerGroup.ConsumerGroupState.STABLE,
null);
- verify(context.metrics, times(1)).onClassicGroupStateTransition(null,
STABLE);
// The new classic member 1 has a heartbeat timeout.
ScheduledTimeout<Void, CoordinatorRecord> heartbeatTimeout =
context.timer.timeout(
@@ -10943,14 +10945,6 @@ public class GroupMetadataManagerTest {
// A new rebalance is triggered.
ClassicGroup classicGroup =
context.groupMetadataManager.getOrMaybeCreateClassicGroup(groupId, false);
assertTrue(classicGroup.isInState(PREPARING_REBALANCE));
-
- // Simulate a failed write to the log.
- result.appendFuture().completeExceptionally(new
NotLeaderOrFollowerException());
- context.rollback();
-
- // The group is reverted back to the consumer group.
- assertEquals(consumerGroup,
context.groupMetadataManager.consumerGroup(groupId));
- verify(context.metrics,
times(1)).onClassicGroupStateTransition(PREPARING_REBALANCE, null);
}
@Test
@@ -11076,7 +11070,6 @@ public class GroupMetadataManagerTest {
groupId,
STABLE,
context.time,
- context.metrics,
10,
Optional.of(ConsumerProtocol.PROTOCOL_TYPE),
Optional.of("range"),
@@ -11119,7 +11112,6 @@ public class GroupMetadataManagerTest {
assertRecordsEquals(expectedRecords.subList(7, 10),
timeout.result.records().subList(7, 10));
verify(context.metrics,
times(1)).onConsumerGroupStateTransition(ConsumerGroup.ConsumerGroupState.STABLE,
null);
- verify(context.metrics, times(1)).onClassicGroupStateTransition(null,
STABLE);
// The new classic member 1 has a heartbeat timeout.
ScheduledTimeout<Void, CoordinatorRecord> heartbeatTimeout =
context.timer.timeout(
@@ -11288,7 +11280,6 @@ public class GroupMetadataManagerTest {
groupId,
STABLE,
context.time,
- context.metrics,
11,
Optional.of(ConsumerProtocol.PROTOCOL_TYPE),
Optional.of("range"),
@@ -11332,7 +11323,6 @@ public class GroupMetadataManagerTest {
assertRecordsEquals(expectedRecords.subList(7, 10),
timeout.result.records().subList(7, 10));
verify(context.metrics,
times(1)).onConsumerGroupStateTransition(ConsumerGroup.ConsumerGroupState.RECONCILING,
null);
- verify(context.metrics, times(1)).onClassicGroupStateTransition(null,
STABLE);
// The new classic member 1 has a heartbeat timeout.
ScheduledTimeout<Void, CoordinatorRecord> heartbeatTimeout =
context.timer.timeout(
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/classic/ClassicGroupTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/classic/ClassicGroupTest.java
index 1c5dfaa1054..1d496c4b04b 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/classic/ClassicGroupTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/classic/ClassicGroupTest.java
@@ -18,7 +18,6 @@ package org.apache.kafka.coordinator.group.classic;
import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor;
import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
-import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.CoordinatorNotAvailableException;
import org.apache.kafka.common.errors.FencedInstanceIdException;
import org.apache.kafka.common.errors.GroupIdNotFoundException;
@@ -39,8 +38,6 @@ import
org.apache.kafka.common.utils.annotation.ApiKeyVersionsSource;
import org.apache.kafka.coordinator.group.OffsetAndMetadata;
import org.apache.kafka.coordinator.group.OffsetExpirationCondition;
import org.apache.kafka.coordinator.group.OffsetExpirationConditionImpl;
-import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetricsShard;
-import org.apache.kafka.timeline.SnapshotRegistry;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -70,10 +67,6 @@ import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
public class ClassicGroupTest {
private final String protocolType = "consumer";
@@ -84,17 +77,12 @@ public class ClassicGroupTest {
private final int rebalanceTimeoutMs = 60000;
private final int sessionTimeoutMs = 10000;
private final LogContext logContext = new LogContext();
- private final GroupCoordinatorMetricsShard metrics = new
GroupCoordinatorMetricsShard(
- new SnapshotRegistry(logContext),
- Collections.emptyMap(),
- new TopicPartition("__consumer_offsets", 0)
- );
private ClassicGroup group = null;
@BeforeEach
public void initialize() {
- group = new ClassicGroup(logContext, "groupId", EMPTY, Time.SYSTEM,
metrics);
+ group = new ClassicGroup(logContext, "groupId", EMPTY, Time.SYSTEM);
}
@Test
@@ -1125,7 +1113,7 @@ public class ClassicGroupTest {
OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(15000L,
OptionalInt.empty(), "", commitTimestamp, OptionalLong.empty());
MockTime time = new MockTime();
long currentStateTimestamp = time.milliseconds();
- ClassicGroup group = new ClassicGroup(new LogContext(), "groupId",
EMPTY, time, mock(GroupCoordinatorMetricsShard.class));
+ ClassicGroup group = new ClassicGroup(new LogContext(), "groupId",
EMPTY, time);
// 1. Test no protocol type. Simple consumer case, Base timestamp
based off of last commit timestamp.
Optional<OffsetExpirationCondition> offsetExpirationCondition =
group.offsetExpirationCondition();
@@ -1200,7 +1188,7 @@ public class ClassicGroupTest {
@Test
public void testIsSubscribedToTopic() {
- ClassicGroup group = new ClassicGroup(new LogContext(), "groupId",
EMPTY, Time.SYSTEM, mock(GroupCoordinatorMetricsShard.class));
+ ClassicGroup group = new ClassicGroup(new LogContext(), "groupId",
EMPTY, Time.SYSTEM);
// 1. group has no protocol type => not subscribed
assertFalse(group.isSubscribedToTopic("topic"));
@@ -1262,30 +1250,9 @@ public class ClassicGroupTest {
assertTrue(group.isSubscribedToTopic("topic"));
}
- @Test
- public void testStateTransitionMetrics() {
- // Confirm metrics is not updated when a new GenericGroup is created
but only when the group transitions
- // its state.
- GroupCoordinatorMetricsShard metrics =
mock(GroupCoordinatorMetricsShard.class);
- ClassicGroup group = new ClassicGroup(new LogContext(), "groupId",
EMPTY, Time.SYSTEM, metrics);
- verify(metrics, times(0)).onClassicGroupStateTransition(any(), any());
-
- group.transitionTo(PREPARING_REBALANCE);
- verify(metrics, times(1)).onClassicGroupStateTransition(EMPTY,
PREPARING_REBALANCE);
-
- group.transitionTo(COMPLETING_REBALANCE);
- verify(metrics,
times(1)).onClassicGroupStateTransition(PREPARING_REBALANCE,
COMPLETING_REBALANCE);
-
- group.transitionTo(STABLE);
- verify(metrics,
times(1)).onClassicGroupStateTransition(COMPLETING_REBALANCE, STABLE);
-
- group.transitionTo(DEAD);
- verify(metrics, times(1)).onClassicGroupStateTransition(STABLE, DEAD);
- }
-
@Test
public void testIsInStates() {
- ClassicGroup group = new ClassicGroup(new LogContext(), "groupId",
EMPTY, Time.SYSTEM, mock(GroupCoordinatorMetricsShard.class));
+ ClassicGroup group = new ClassicGroup(new LogContext(), "groupId",
EMPTY, Time.SYSTEM);
assertTrue(group.isInStates(Collections.singleton("empty"), 0));
group.transitionTo(PREPARING_REBALANCE);
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorMetricsShardTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorMetricsShardTest.java
index 950c359294a..09948b05640 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorMetricsShardTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorMetricsShardTest.java
@@ -20,9 +20,6 @@ import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.utils.LogContext;
-import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.coordinator.group.classic.ClassicGroup;
-import org.apache.kafka.coordinator.group.classic.ClassicGroupState;
import org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroup;
import org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroupMember;
import org.apache.kafka.timeline.SnapshotRegistry;
@@ -34,13 +31,7 @@ import org.junit.jupiter.api.Test;
import java.util.Collections;
import java.util.stream.IntStream;
-import static
org.apache.kafka.coordinator.group.classic.ClassicGroupState.COMPLETING_REBALANCE;
-import static
org.apache.kafka.coordinator.group.classic.ClassicGroupState.DEAD;
-import static
org.apache.kafka.coordinator.group.classic.ClassicGroupState.EMPTY;
-import static
org.apache.kafka.coordinator.group.classic.ClassicGroupState.PREPARING_REBALANCE;
-import static
org.apache.kafka.coordinator.group.classic.ClassicGroupState.STABLE;
import static
org.apache.kafka.coordinator.group.metrics.MetricsTestUtils.assertGaugeValue;
-import static
org.apache.kafka.coordinator.group.metrics.MetricsTestUtils.metricName;
import static org.junit.jupiter.api.Assertions.assertEquals;
public class GroupCoordinatorMetricsShardTest {
@@ -98,59 +89,6 @@ public class GroupCoordinatorMetricsShardTest {
assertEquals(0,
shard.numConsumerGroups(ConsumerGroup.ConsumerGroupState.DEAD));
}
- @Test
- public void testGenericGroupStateTransitionMetrics() {
- MetricsRegistry registry = new MetricsRegistry();
- Metrics metrics = new Metrics();
- TopicPartition tp = new TopicPartition("__consumer_offsets", 0);
- GroupCoordinatorMetrics coordinatorMetrics = new
GroupCoordinatorMetrics(registry, metrics);
- GroupCoordinatorMetricsShard shard =
coordinatorMetrics.newMetricsShard(new SnapshotRegistry(new LogContext()), tp);
- coordinatorMetrics.activateMetricsShard(shard);
-
- LogContext logContext = new LogContext();
- ClassicGroup group0 = new ClassicGroup(logContext, "groupId0", EMPTY,
Time.SYSTEM, shard);
- ClassicGroup group1 = new ClassicGroup(logContext, "groupId1", EMPTY,
Time.SYSTEM, shard);
- ClassicGroup group2 = new ClassicGroup(logContext, "groupId2", EMPTY,
Time.SYSTEM, shard);
- ClassicGroup group3 = new ClassicGroup(logContext, "groupId3", EMPTY,
Time.SYSTEM, shard);
-
- IntStream.range(0, 4).forEach(__ ->
shard.incrementNumClassicGroups(EMPTY));
-
- assertEquals(4, shard.numClassicGroups());
-
- group0.transitionTo(PREPARING_REBALANCE);
- group0.transitionTo(COMPLETING_REBALANCE);
- group1.transitionTo(PREPARING_REBALANCE);
- group2.transitionTo(DEAD);
-
- assertEquals(1, shard.numClassicGroups(ClassicGroupState.EMPTY));
- assertEquals(1,
shard.numClassicGroups(ClassicGroupState.PREPARING_REBALANCE));
- assertEquals(1,
shard.numClassicGroups(ClassicGroupState.COMPLETING_REBALANCE));
- assertEquals(1, shard.numClassicGroups(ClassicGroupState.DEAD));
- assertEquals(0, shard.numClassicGroups(ClassicGroupState.STABLE));
-
- group0.transitionTo(STABLE);
- group1.transitionTo(COMPLETING_REBALANCE);
- group3.transitionTo(DEAD);
-
- assertEquals(0, shard.numClassicGroups(ClassicGroupState.EMPTY));
- assertEquals(0,
shard.numClassicGroups(ClassicGroupState.PREPARING_REBALANCE));
- assertEquals(1,
shard.numClassicGroups(ClassicGroupState.COMPLETING_REBALANCE));
- assertEquals(2, shard.numClassicGroups(ClassicGroupState.DEAD));
- assertEquals(1, shard.numClassicGroups(ClassicGroupState.STABLE));
-
- assertGaugeValue(
- metrics,
- metrics.metricName("group-count", "group-coordinator-metrics",
Collections.singletonMap("protocol", "classic")),
- 4
- );
- assertGaugeValue(registry, metricName("GroupMetadataManager",
"NumGroups"), 4);
- assertGaugeValue(registry, metricName("GroupMetadataManager",
"NumGroupsEmpty"), 0);
- assertGaugeValue(registry, metricName("GroupMetadataManager",
"NumGroupsPreparingRebalance"), 0);
- assertGaugeValue(registry, metricName("GroupMetadataManager",
"NumGroupsCompletingRebalance"), 1);
- assertGaugeValue(registry, metricName("GroupMetadataManager",
"NumGroupsDead"), 2);
- assertGaugeValue(registry, metricName("GroupMetadataManager",
"NumGroupsStable"), 1);
- }
-
@Test
public void testConsumerGroupStateTransitionMetrics() {
MetricsRegistry registry = new MetricsRegistry();
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorMetricsTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorMetricsTest.java
index cc0c759c440..0a4f491e819 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorMetricsTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorMetricsTest.java
@@ -23,6 +23,7 @@ import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.coordinator.group.Group;
import org.apache.kafka.coordinator.group.classic.ClassicGroupState;
import
org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroup.ConsumerGroupState;
@@ -165,11 +166,19 @@ public class GroupCoordinatorMetricsTest {
coordinatorMetrics.activateMetricsShard(shard0);
coordinatorMetrics.activateMetricsShard(shard1);
- IntStream.range(0, 5).forEach(__ ->
shard0.incrementNumClassicGroups(ClassicGroupState.PREPARING_REBALANCE));
- IntStream.range(0, 1).forEach(__ ->
shard0.decrementNumClassicGroups(ClassicGroupState.COMPLETING_REBALANCE));
- IntStream.range(0, 5).forEach(__ ->
shard1.incrementNumClassicGroups(ClassicGroupState.STABLE));
- IntStream.range(0, 4).forEach(__ ->
shard1.incrementNumClassicGroups(ClassicGroupState.DEAD));
- IntStream.range(0, 4).forEach(__ ->
shard1.decrementNumClassicGroups(ClassicGroupState.EMPTY));
+ shard0.setClassicGroupGauges(Utils.mkMap(
+ Utils.mkEntry(ClassicGroupState.PREPARING_REBALANCE, 1L),
+ Utils.mkEntry(ClassicGroupState.COMPLETING_REBALANCE, 1L),
+ Utils.mkEntry(ClassicGroupState.STABLE, 1L),
+ Utils.mkEntry(ClassicGroupState.EMPTY, 1L)
+ ));
+ shard1.setClassicGroupGauges(Utils.mkMap(
+ Utils.mkEntry(ClassicGroupState.PREPARING_REBALANCE, 1L),
+ Utils.mkEntry(ClassicGroupState.COMPLETING_REBALANCE, 1L),
+ Utils.mkEntry(ClassicGroupState.STABLE, 1L),
+ Utils.mkEntry(ClassicGroupState.EMPTY, 1L),
+ Utils.mkEntry(ClassicGroupState.DEAD, 1L)
+ ));
IntStream.range(0, 5).forEach(__ ->
shard0.incrementNumConsumerGroups(ConsumerGroupState.ASSIGNING));
IntStream.range(0, 5).forEach(__ ->
shard1.incrementNumConsumerGroups(ConsumerGroupState.RECONCILING));