This is an automated email from the ASF dual-hosted git repository.
jeffkbkim pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 1a106e45387 KAFKA-18655: Implement the consumer group size counter
with scheduled task (#18717)
1a106e45387 is described below
commit 1a106e453872403ae6803a3ca30d7f8b379ae26e
Author: Dongnuo Lyu <[email protected]>
AuthorDate: Mon Feb 3 10:50:21 2025 -0500
KAFKA-18655: Implement the consumer group size counter with scheduled task
(#18717)
During testing we discovered that the empty group count is not updated in
group conversion, but when the new group is transition to other state, the
empty group count is decremented. This could result in negative empty group
count.
We can have a new consumer group count implementation that follows the
pattern we did for the classic group count. The timeout task periodically
refreshes the metrics based on the current groups soft state.
Reviewers: Jeff Kim <[email protected]>
---
.../coordinator/group/GroupCoordinatorShard.java | 24 ++--
.../coordinator/group/GroupMetadataManager.java | 43 +++---
.../metrics/GroupCoordinatorMetricsShard.java | 117 +++-------------
.../group/modern/consumer/ConsumerGroup.java | 2 -
.../group/GroupCoordinatorShardTest.java | 14 +-
.../group/GroupMetadataManagerTest.java | 148 +++++++--------------
.../metrics/GroupCoordinatorMetricsShardTest.java | 124 -----------------
.../group/metrics/GroupCoordinatorMetricsTest.java | 9 +-
.../group/modern/consumer/ConsumerGroupTest.java | 43 ------
9 files changed, 107 insertions(+), 417 deletions(-)
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
index 85fb54761b6..86c7e8abba7 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
@@ -258,11 +258,11 @@ public class GroupCoordinatorShard implements
CoordinatorShard<CoordinatorRecord
static final String GROUP_EXPIRATION_KEY = "expire-group-metadata";
/**
- * The classic group size counter key to schedule a timer task.
+ * The classic and consumer group size counter key to schedule a timer
task.
*
* Visible for testing.
*/
- static final String CLASSIC_GROUP_SIZE_COUNTER_KEY =
"classic-group-size-counter";
+ static final String GROUP_SIZE_COUNTER_KEY = "group-size-counter";
/**
* Hardcoded default value of the interval to update the classic group
size counter.
@@ -699,27 +699,27 @@ public class GroupCoordinatorShard implements
CoordinatorShard<CoordinatorRecord
}
/**
- * Schedules (or reschedules) the group size counter for the classic
groups.
+ * Schedules (or reschedules) the group size counter for the
classic/consumer groups.
*/
- private void scheduleClassicGroupSizeCounter() {
+ private void scheduleGroupSizeCounter() {
timer.schedule(
- CLASSIC_GROUP_SIZE_COUNTER_KEY,
+ GROUP_SIZE_COUNTER_KEY,
DEFAULT_GROUP_GAUGES_UPDATE_INTERVAL_MS,
TimeUnit.MILLISECONDS,
true,
() -> {
- groupMetadataManager.updateClassicGroupSizeCounter();
- scheduleClassicGroupSizeCounter();
+ groupMetadataManager.updateGroupSizeCounter();
+ scheduleGroupSizeCounter();
return GroupMetadataManager.EMPTY_RESULT;
}
);
}
/**
- * Cancels the group size counter for the classic groups.
+ * Cancels the group size counter for the classic/consumer groups.
*/
- private void cancelClassicGroupSizeCounter() {
- timer.cancel(CLASSIC_GROUP_SIZE_COUNTER_KEY);
+ private void cancelGroupSizeCounter() {
+ timer.cancel(GROUP_SIZE_COUNTER_KEY);
}
/**
@@ -736,7 +736,7 @@ public class GroupCoordinatorShard implements
CoordinatorShard<CoordinatorRecord
groupMetadataManager.onLoaded();
scheduleGroupMetadataExpiration();
- scheduleClassicGroupSizeCounter();
+ scheduleGroupSizeCounter();
}
@Override
@@ -744,7 +744,7 @@ public class GroupCoordinatorShard implements
CoordinatorShard<CoordinatorRecord
timer.cancel(GROUP_EXPIRATION_KEY);
coordinatorMetrics.deactivateMetricsShard(metricsShard);
groupMetadataManager.onUnloaded();
- cancelClassicGroupSizeCounter();
+ cancelGroupSizeCounter();
}
/**
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
index e1e151d53d3..ab2720680d8 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
@@ -188,7 +188,6 @@ import static
org.apache.kafka.coordinator.group.classic.ClassicGroupState.STABL
import static
org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.CLASSIC_GROUP_COMPLETED_REBALANCES_SENSOR_NAME;
import static
org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.CONSUMER_GROUP_REBALANCES_SENSOR_NAME;
import static
org.apache.kafka.coordinator.group.modern.ModernGroupMember.hasAssignedPartitionsChanged;
-import static
org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroupMember.hasAssignedPartitionsChanged;
/**
* The GroupMetadataManager manages the metadata of all classic and consumer
groups. It holds
@@ -745,7 +744,6 @@ public class GroupMetadataManager {
if (group == null) {
ConsumerGroup consumerGroup = new ConsumerGroup(snapshotRegistry,
groupId, metrics);
groups.put(groupId, consumerGroup);
- metrics.onConsumerGroupStateTransition(null,
consumerGroup.state());
return consumerGroup;
} else if (group.type() == CONSUMER) {
return (ConsumerGroup) group;
@@ -756,7 +754,6 @@ public class GroupMetadataManager {
// replaying consumer group records after offset commit records
would not work.
ConsumerGroup consumerGroup = new ConsumerGroup(snapshotRegistry,
groupId, metrics);
groups.put(groupId, consumerGroup);
- metrics.onConsumerGroupStateTransition(null,
consumerGroup.state());
return consumerGroup;
} else {
throw new IllegalStateException(String.format("Group %s is not a
consumer group", groupId));
@@ -1134,24 +1131,7 @@ public class GroupMetadataManager {
private void removeGroup(
String groupId
) {
- Group group = groups.remove(groupId);
- if (group != null) {
- switch (group.type()) {
- case CONSUMER:
- ConsumerGroup consumerGroup = (ConsumerGroup) group;
-
metrics.onConsumerGroupStateTransition(consumerGroup.state(), null);
- break;
- case CLASSIC:
- // The classic group size counter is implemented as
scheduled task.
- break;
- case SHARE:
- // Nothing for now, but we may want to add metrics in the
future.
- break;
- default:
- log.warn("Removed group {} with an unknown group type
{}.", groupId, group.type());
- break;
- }
- }
+ groups.remove(groupId);
}
/**
@@ -4137,16 +4117,25 @@ public class GroupMetadataManager {
}
/**
- * Counts and updates the number of classic groups in different states.
+ * Counts and updates the number of classic and consumer groups in
different states.
*/
- public void updateClassicGroupSizeCounter() {
- Map<ClassicGroupState, Long> groupSizeCounter = new HashMap<>();
+ public void updateGroupSizeCounter() {
+ Map<ClassicGroupState, Long> classicGroupSizeCounter = new HashMap<>();
+ Map<ConsumerGroup.ConsumerGroupState, Long> consumerGroupSizeCounter =
new HashMap<>();
groups.forEach((__, group) -> {
- if (group.type() == CLASSIC) {
- groupSizeCounter.compute(((ClassicGroup)
group).currentState(), Utils::incValue);
+ switch (group.type()) {
+ case CLASSIC:
+ classicGroupSizeCounter.compute(((ClassicGroup)
group).currentState(), Utils::incValue);
+ break;
+ case CONSUMER:
+ consumerGroupSizeCounter.compute(((ConsumerGroup)
group).state(), Utils::incValue);
+ break;
+ default:
+ break;
}
});
- metrics.setClassicGroupGauges(groupSizeCounter);
+ metrics.setClassicGroupGauges(classicGroupSizeCounter);
+ metrics.setConsumerGroupGauges(consumerGroupSizeCounter);
}
/**
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorMetricsShard.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorMetricsShard.java
index 219a4f0a22c..1ed75229f58 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorMetricsShard.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorMetricsShard.java
@@ -71,7 +71,7 @@ public class GroupCoordinatorMetricsShard implements
CoordinatorMetricsShard {
/**
* Consumer group size gauge counters keyed by the metric name.
*/
- private final Map<ConsumerGroupState, TimelineGaugeCounter>
consumerGroupGauges;
+ private volatile Map<ConsumerGroupState, Long> consumerGroupGauges;
/**
* Share group size gauge counters keyed by the metric name.
@@ -108,19 +108,7 @@ public class GroupCoordinatorMetricsShard implements
CoordinatorMetricsShard {
numClassicGroupsTimelineCounter = new TimelineGaugeCounter(new
TimelineLong(snapshotRegistry), new AtomicLong(0));
this.classicGroupGauges = Collections.emptyMap();
-
- this.consumerGroupGauges = Utils.mkMap(
- Utils.mkEntry(ConsumerGroupState.EMPTY,
- new TimelineGaugeCounter(new TimelineLong(snapshotRegistry),
new AtomicLong(0))),
- Utils.mkEntry(ConsumerGroupState.ASSIGNING,
- new TimelineGaugeCounter(new TimelineLong(snapshotRegistry),
new AtomicLong(0))),
- Utils.mkEntry(ConsumerGroupState.RECONCILING,
- new TimelineGaugeCounter(new TimelineLong(snapshotRegistry),
new AtomicLong(0))),
- Utils.mkEntry(ConsumerGroupState.STABLE,
- new TimelineGaugeCounter(new TimelineLong(snapshotRegistry),
new AtomicLong(0))),
- Utils.mkEntry(ConsumerGroupState.DEAD,
- new TimelineGaugeCounter(new TimelineLong(snapshotRegistry),
new AtomicLong(0)))
- );
+ this.consumerGroupGauges = Collections.emptyMap();
this.shareGroupGauges = Utils.mkMap(
Utils.mkEntry(ShareGroup.ShareGroupState.EMPTY,
@@ -145,17 +133,15 @@ public class GroupCoordinatorMetricsShard implements
CoordinatorMetricsShard {
}
/**
- * Increment the number of consumer groups.
+ * Set the number of consumer groups.
+ * This method should be the only way to update the map and is called by
the scheduled task
+ * that updates the metrics in {@link
org.apache.kafka.coordinator.group.GroupCoordinatorShard}.
+ * Breaking this will result in inconsistent behavior.
*
- * @param state the consumer group state.
+ * @param consumerGroupGauges The map counting the number of consumer
groups in each state.
*/
- public void incrementNumConsumerGroups(ConsumerGroupState state) {
- TimelineGaugeCounter gaugeCounter = consumerGroupGauges.get(state);
- if (gaugeCounter != null) {
- synchronized (gaugeCounter.timelineLong) {
- gaugeCounter.timelineLong.increment();
- }
- }
+ public void setConsumerGroupGauges(Map<ConsumerGroupState, Long>
consumerGroupGauges) {
+ this.consumerGroupGauges = consumerGroupGauges;
}
/**
@@ -167,20 +153,6 @@ public class GroupCoordinatorMetricsShard implements
CoordinatorMetricsShard {
}
}
- /**
- * Decrement the number of consumer groups.
- *
- * @param state the consumer group state.
- */
- public void decrementNumConsumerGroups(ConsumerGroupState state) {
- TimelineGaugeCounter gaugeCounter = consumerGroupGauges.get(state);
- if (gaugeCounter != null) {
- synchronized (gaugeCounter.timelineLong) {
- gaugeCounter.timelineLong.decrement();
- }
- }
- }
-
/**
* @return The number of offsets.
*/
@@ -219,9 +191,9 @@ public class GroupCoordinatorMetricsShard implements
CoordinatorMetricsShard {
* @return The number of consumer groups in `state`.
*/
public long numConsumerGroups(ConsumerGroupState state) {
- TimelineGaugeCounter gaugeCounter = consumerGroupGauges.get(state);
- if (gaugeCounter != null) {
- return gaugeCounter.atomicLong.get();
+ Long counter = consumerGroupGauges.get(state);
+ if (counter != null) {
+ return counter;
}
return 0L;
}
@@ -231,7 +203,7 @@ public class GroupCoordinatorMetricsShard implements
CoordinatorMetricsShard {
*/
public long numConsumerGroups() {
return consumerGroupGauges.values().stream()
- .mapToLong(timelineGaugeCounter ->
timelineGaugeCounter.atomicLong.get()).sum();
+ .mapToLong(Long::longValue).sum();
}
@Override
@@ -257,14 +229,6 @@ public class GroupCoordinatorMetricsShard implements
CoordinatorMetricsShard {
@Override
public void commitUpTo(long offset) {
- this.consumerGroupGauges.forEach((__, gaugeCounter) -> {
- long value;
- synchronized (gaugeCounter.timelineLong) {
- value = gaugeCounter.timelineLong.get(offset);
- }
- gaugeCounter.atomicLong.set(value);
- });
-
synchronized (numClassicGroupsTimelineCounter.timelineLong) {
long value =
numClassicGroupsTimelineCounter.timelineLong.get(offset);
numClassicGroupsTimelineCounter.atomicLong.set(value);
@@ -286,8 +250,11 @@ public class GroupCoordinatorMetricsShard implements
CoordinatorMetricsShard {
/**
* Sets the classicGroupGauges.
+ * This method should be the only way to update the map and is called by
the scheduled task
+ * that updates the metrics in {@link
org.apache.kafka.coordinator.group.GroupCoordinatorShard}.
+ * Breaking this will result in inconsistent behavior.
*
- * @param classicGroupGauges The new classicGroupGauges.
+ * @param classicGroupGauges The map counting the number of classic groups
in each state.
*/
public void setClassicGroupGauges(
Map<ClassicGroupState, Long> classicGroupGauges
@@ -295,56 +262,6 @@ public class GroupCoordinatorMetricsShard implements
CoordinatorMetricsShard {
this.classicGroupGauges = classicGroupGauges;
}
- /**
- * Called when a consumer group's state has changed. Increment/decrement
- * the counter accordingly.
- *
- * @param oldState The previous state. null value means that it's a new
group.
- * @param newState The next state. null value means that the group has
been removed.
- */
- public void onConsumerGroupStateTransition(
- ConsumerGroupState oldState,
- ConsumerGroupState newState
- ) {
- if (newState != null) {
- switch (newState) {
- case EMPTY:
- incrementNumConsumerGroups(ConsumerGroupState.EMPTY);
- break;
- case ASSIGNING:
- incrementNumConsumerGroups(ConsumerGroupState.ASSIGNING);
- break;
- case RECONCILING:
- incrementNumConsumerGroups(ConsumerGroupState.RECONCILING);
- break;
- case STABLE:
- incrementNumConsumerGroups(ConsumerGroupState.STABLE);
- break;
- case DEAD:
- incrementNumConsumerGroups(ConsumerGroupState.DEAD);
- }
- }
-
- if (oldState != null) {
- switch (oldState) {
- case EMPTY:
- decrementNumConsumerGroups(ConsumerGroupState.EMPTY);
- break;
- case ASSIGNING:
- decrementNumConsumerGroups(ConsumerGroupState.ASSIGNING);
- break;
- case RECONCILING:
- decrementNumConsumerGroups(ConsumerGroupState.RECONCILING);
- break;
- case STABLE:
- decrementNumConsumerGroups(ConsumerGroupState.STABLE);
- break;
- case DEAD:
- decrementNumConsumerGroups(ConsumerGroupState.DEAD);
- }
- }
- }
-
public void incrementNumShareGroups(ShareGroup.ShareGroupState state) {
TimelineGaugeCounter gaugeCounter = shareGroupGauges.get(state);
if (gaugeCounter != null) {
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java
index 58d70ffe99a..16e0d0fae4a 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java
@@ -885,7 +885,6 @@ public class ConsumerGroup extends
ModernGroup<ConsumerGroupMember> {
@Override
protected void maybeUpdateGroupState() {
- ConsumerGroupState previousState = state.get();
ConsumerGroupState newState = STABLE;
if (members.isEmpty()) {
newState = EMPTY;
@@ -901,7 +900,6 @@ public class ConsumerGroup extends
ModernGroup<ConsumerGroupMember> {
}
state.set(newState);
- metrics.onConsumerGroupStateTransition(previousState, newState);
}
/**
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java
index 4648f35670b..ea0c2b02e4d 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java
@@ -79,9 +79,9 @@ import java.util.List;
import java.util.Set;
import static
org.apache.kafka.coordinator.common.runtime.TestUtil.requestContext;
-import static
org.apache.kafka.coordinator.group.GroupCoordinatorShard.CLASSIC_GROUP_SIZE_COUNTER_KEY;
import static
org.apache.kafka.coordinator.group.GroupCoordinatorShard.DEFAULT_GROUP_GAUGES_UPDATE_INTERVAL_MS;
import static
org.apache.kafka.coordinator.group.GroupCoordinatorShard.GROUP_EXPIRATION_KEY;
+import static
org.apache.kafka.coordinator.group.GroupCoordinatorShard.GROUP_SIZE_COUNTER_KEY;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNull;
@@ -1024,7 +1024,7 @@ public class GroupCoordinatorShardTest {
}
@Test
- public void testScheduleClassicGroupSizeCounter() {
+ public void testScheduleGroupSizeCounter() {
GroupMetadataManager groupMetadataManager =
mock(GroupMetadataManager.class);
OffsetMetadataManager offsetMetadataManager =
mock(OffsetMetadataManager.class);
CoordinatorMetrics coordinatorMetrics = mock(CoordinatorMetrics.class);
@@ -1046,21 +1046,21 @@ public class GroupCoordinatorShardTest {
);
coordinator.onLoaded(MetadataImage.EMPTY);
- // The classic group size counter is scheduled.
+ // The counter is scheduled.
assertEquals(
DEFAULT_GROUP_GAUGES_UPDATE_INTERVAL_MS,
- timer.timeout(CLASSIC_GROUP_SIZE_COUNTER_KEY).deadlineMs -
time.milliseconds()
+ timer.timeout(GROUP_SIZE_COUNTER_KEY).deadlineMs -
time.milliseconds()
);
// Advance the timer to trigger the update.
time.sleep(DEFAULT_GROUP_GAUGES_UPDATE_INTERVAL_MS + 1);
timer.poll();
- verify(groupMetadataManager, times(1)).updateClassicGroupSizeCounter();
+ verify(groupMetadataManager, times(1)).updateGroupSizeCounter();
- // The classic group size counter is scheduled.
+ // The counter is scheduled.
assertEquals(
DEFAULT_GROUP_GAUGES_UPDATE_INTERVAL_MS,
- timer.timeout(CLASSIC_GROUP_SIZE_COUNTER_KEY).deadlineMs -
time.milliseconds()
+ timer.timeout(GROUP_SIZE_COUNTER_KEY).deadlineMs -
time.milliseconds()
);
}
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
index 4e4e7cf7645..1287f121327 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
@@ -3589,42 +3589,71 @@ public class GroupMetadataManagerTest {
}
@Test
- public void testUpdateClassicGroupSizeCounter() {
- String groupId0 = "group-0";
- String groupId1 = "group-1";
- String groupId2 = "group-2";
- String groupId3 = "group-3";
- String groupId4 = "group-4";
+ public void testUpdateGroupSizeCounter() {
+ List<String> groupIds = new ArrayList<>();
+ IntStream.range(0, 8).forEach(i -> groupIds.add("group-" + i));
+ List<String> consumerMemberIds = List.of("consumer-member-id-0",
"consumer-member-id-1", "consumer-member-id-2");
GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
- .withConsumerGroup(new ConsumerGroupBuilder(groupId0, 10))
+ .withConsumerGroup(new ConsumerGroupBuilder(groupIds.get(0), 10))
// Empty group
+ .withConsumerGroup(new ConsumerGroupBuilder(groupIds.get(1), 10)
// Stable group
+ .withAssignmentEpoch(10)
+ .withMember(new
ConsumerGroupMember.Builder(consumerMemberIds.get(0))
+ .setMemberEpoch(10)
+ .build()))
+ .withConsumerGroup(new ConsumerGroupBuilder(groupIds.get(2), 10)
// Assigning group
+ .withAssignmentEpoch(9)
+ .withMember(new
ConsumerGroupMember.Builder(consumerMemberIds.get(1))
+ .setMemberEpoch(9)
+ .build()))
+ .withConsumerGroup(new ConsumerGroupBuilder(groupIds.get(3), 10)
// Reconciling group
+ .withAssignmentEpoch(10)
+ .withMember(new
ConsumerGroupMember.Builder(consumerMemberIds.get(2))
+ .setMemberEpoch(9)
+ .build()))
.build();
- ClassicGroup group1 =
context.groupMetadataManager.getOrMaybeCreateClassicGroup(groupId1, true);
- ClassicGroup group2 =
context.groupMetadataManager.getOrMaybeCreateClassicGroup(groupId2, true);
- ClassicGroup group3 =
context.groupMetadataManager.getOrMaybeCreateClassicGroup(groupId3, true);
- ClassicGroup group4 =
context.groupMetadataManager.getOrMaybeCreateClassicGroup(groupId4, true);
+ ClassicGroup group4 =
context.groupMetadataManager.getOrMaybeCreateClassicGroup(groupIds.get(4),
true);
+ ClassicGroup group5 =
context.groupMetadataManager.getOrMaybeCreateClassicGroup(groupIds.get(5),
true);
+ ClassicGroup group6 =
context.groupMetadataManager.getOrMaybeCreateClassicGroup(groupIds.get(6),
true);
+ ClassicGroup group7 =
context.groupMetadataManager.getOrMaybeCreateClassicGroup(groupIds.get(7),
true);
- context.groupMetadataManager.updateClassicGroupSizeCounter();
+ context.groupMetadataManager.updateGroupSizeCounter();
verify(context.metrics, times(1)).setClassicGroupGauges(eq(Utils.mkMap(
Utils.mkEntry(ClassicGroupState.EMPTY, 4L)
)));
+ verify(context.metrics,
times(1)).setConsumerGroupGauges(eq(Utils.mkMap(
+ Utils.mkEntry(ConsumerGroup.ConsumerGroupState.EMPTY, 1L),
+ Utils.mkEntry(ConsumerGroup.ConsumerGroupState.ASSIGNING, 1L),
+ Utils.mkEntry(ConsumerGroup.ConsumerGroupState.RECONCILING, 1L),
+ Utils.mkEntry(ConsumerGroup.ConsumerGroupState.STABLE, 1L)
+ )));
- group1.transitionTo(PREPARING_REBALANCE);
- group2.transitionTo(PREPARING_REBALANCE);
- group2.transitionTo(COMPLETING_REBALANCE);
- group3.transitionTo(PREPARING_REBALANCE);
- group3.transitionTo(COMPLETING_REBALANCE);
- group3.transitionTo(STABLE);
- group4.transitionTo(DEAD);
+ group4.transitionTo(PREPARING_REBALANCE);
+ group5.transitionTo(PREPARING_REBALANCE);
+ group5.transitionTo(COMPLETING_REBALANCE);
+ group6.transitionTo(PREPARING_REBALANCE);
+ group6.transitionTo(COMPLETING_REBALANCE);
+ group6.transitionTo(STABLE);
+ group7.transitionTo(DEAD);
- context.groupMetadataManager.updateClassicGroupSizeCounter();
+
context.groupMetadataManager.getOrMaybeCreateConsumerGroup(groupIds.get(1),
false, Collections.emptyList())
+ .removeMember(consumerMemberIds.get(0));
+
context.groupMetadataManager.getOrMaybeCreateConsumerGroup(groupIds.get(3),
false, Collections.emptyList())
+ .updateMember(new
ConsumerGroupMember.Builder(consumerMemberIds.get(2)).setMemberEpoch(10).build());
+
+ context.groupMetadataManager.updateGroupSizeCounter();
verify(context.metrics, times(1)).setClassicGroupGauges(eq(Utils.mkMap(
Utils.mkEntry(ClassicGroupState.PREPARING_REBALANCE, 1L),
Utils.mkEntry(ClassicGroupState.COMPLETING_REBALANCE, 1L),
Utils.mkEntry(ClassicGroupState.STABLE, 1L),
Utils.mkEntry(ClassicGroupState.DEAD, 1L)
)));
+ verify(context.metrics,
times(1)).setConsumerGroupGauges(eq(Utils.mkMap(
+ Utils.mkEntry(ConsumerGroup.ConsumerGroupState.EMPTY, 2L),
+ Utils.mkEntry(ConsumerGroup.ConsumerGroupState.ASSIGNING, 1L),
+ Utils.mkEntry(ConsumerGroup.ConsumerGroupState.STABLE, 1L)
+ )));
}
@Test
@@ -9548,75 +9577,6 @@ public class GroupMetadataManagerTest {
verify(context.metrics).record(CONSUMER_GROUP_REBALANCES_SENSOR_NAME);
}
- @Test
- public void testOnClassicGroupStateTransitionOnLoading() {
- GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
- .build();
-
- ClassicGroup group = new ClassicGroup(
- new LogContext(),
- "group-id",
- EMPTY,
- context.time
- );
-
- // Even if there are more group metadata records loaded than tombstone
records, the last replayed record
- // (tombstone in this test) is the latest state of the group. Hence,
the overall metric count should be 0.
- IntStream.range(0, 5).forEach(__ ->
-
context.replay(GroupCoordinatorRecordHelpers.newGroupMetadataRecord(group,
Collections.emptyMap()))
- );
- IntStream.range(0, 4).forEach(__ ->
-
context.replay(GroupCoordinatorRecordHelpers.newGroupMetadataTombstoneRecord("group-id"))
- );
- }
-
- @Test
- public void testOnConsumerGroupStateTransition() {
- GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
- .build();
-
- // Replaying a consumer group epoch record should increment metric.
-
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord("group-id",
1));
- verify(context.metrics, times(1)).onConsumerGroupStateTransition(null,
ConsumerGroup.ConsumerGroupState.EMPTY);
-
- // Replaying a consumer group epoch record for a group that has
already been created should not increment metric.
-
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord("group-id",
1));
- verify(context.metrics, times(1)).onConsumerGroupStateTransition(null,
ConsumerGroup.ConsumerGroupState.EMPTY);
-
- // Creating and replaying tombstones for a group should remove group
and decrement metric.
- List<CoordinatorRecord> tombstones = new ArrayList<>();
- Group group = context.groupMetadataManager.group("group-id");
- group.createGroupTombstoneRecords(tombstones);
- tombstones.forEach(context::replay);
- assertThrows(GroupIdNotFoundException.class, () ->
context.groupMetadataManager.group("group-id"));
- verify(context.metrics,
times(1)).onConsumerGroupStateTransition(ConsumerGroup.ConsumerGroupState.EMPTY,
null);
-
- // Replaying a tombstone for a group that has already been removed
should not decrement metric.
- tombstones.forEach(context::replay);
- verify(context.metrics,
times(1)).onConsumerGroupStateTransition(ConsumerGroup.ConsumerGroupState.EMPTY,
null);
- }
-
- @Test
- public void testOnConsumerGroupStateTransitionOnLoading() {
- GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
- .build();
-
- // Even if there are more group epoch records loaded than tombstone
records, the last replayed record
- // (tombstone in this test) is the latest state of the group. Hence,
the overall metric count should be 0.
- IntStream.range(0, 5).forEach(__ ->
-
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord("group-id",
0))
- );
-
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochTombstoneRecord("group-id"));
-
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupEpochTombstoneRecord("group-id"));
- IntStream.range(0, 3).forEach(__ -> {
-
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochTombstoneRecord("group-id"));
-
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupEpochTombstoneRecord("group-id"));
- });
-
- verify(context.metrics, times(1)).onConsumerGroupStateTransition(null,
ConsumerGroup.ConsumerGroupState.EMPTY);
- verify(context.metrics,
times(1)).onConsumerGroupStateTransition(ConsumerGroup.ConsumerGroupState.EMPTY,
null);
- }
-
@Test
public void testConsumerGroupHeartbeatWithNonEmptyClassicGroup() {
String classicGroupId = "classic-group-id";
@@ -11153,8 +11113,6 @@ public class GroupMetadataManagerTest {
result.records()
);
- verify(context.metrics,
times(1)).onConsumerGroupStateTransition(ConsumerGroup.ConsumerGroupState.STABLE,
null);
-
// The new classic member 1 has a heartbeat timeout.
ScheduledTimeout<Void, CoordinatorRecord> heartbeatTimeout =
context.timer.timeout(
classicGroupHeartbeatKey(groupId, memberId1)
@@ -11340,8 +11298,6 @@ public class GroupMetadataManagerTest {
timeout.result.records()
);
- verify(context.metrics,
times(1)).onConsumerGroupStateTransition(ConsumerGroup.ConsumerGroupState.STABLE,
null);
-
// The new classic member 1 has a heartbeat timeout.
ScheduledTimeout<Void, CoordinatorRecord> heartbeatTimeout =
context.timer.timeout(
classicGroupHeartbeatKey(groupId, memberId1)
@@ -11545,8 +11501,6 @@ public class GroupMetadataManagerTest {
timeout.result.records()
);
- verify(context.metrics,
times(1)).onConsumerGroupStateTransition(ConsumerGroup.ConsumerGroupState.RECONCILING,
null);
-
// The new classic member 1 has a heartbeat timeout.
ScheduledTimeout<Void, CoordinatorRecord> heartbeatTimeout =
context.timer.timeout(
classicGroupHeartbeatKey(groupId, memberId1)
@@ -11780,8 +11734,6 @@ public class GroupMetadataManagerTest {
result.records
);
- verify(context.metrics,
times(1)).onConsumerGroupStateTransition(ConsumerGroup.ConsumerGroupState.STABLE,
null);
-
// The new classic member 1 has a heartbeat timeout.
ScheduledTimeout<Void, CoordinatorRecord> heartbeatTimeout =
context.timer.timeout(
classicGroupHeartbeatKey(groupId, memberId1)
@@ -14298,8 +14250,6 @@ public class GroupMetadataManagerTest {
leaveResult.records()
);
- verify(context.metrics,
times(1)).onConsumerGroupStateTransition(ConsumerGroup.ConsumerGroupState.STABLE,
null);
-
// The new classic member 1 has a heartbeat timeout.
ScheduledTimeout<Void, CoordinatorRecord> heartbeatTimeout =
context.timer.timeout(
classicGroupHeartbeatKey(groupId, memberId1)
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorMetricsShardTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorMetricsShardTest.java
index 06ffeaa84d1..7dc3a4d17f6 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorMetricsShardTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorMetricsShardTest.java
@@ -17,22 +17,15 @@
package org.apache.kafka.coordinator.group.metrics;
import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.internals.Topic;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.utils.LogContext;
-import org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroup;
-import org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroupMember;
import org.apache.kafka.timeline.SnapshotRegistry;
import com.yammer.metrics.core.MetricsRegistry;
import org.junit.jupiter.api.Test;
-import java.util.Collections;
-import java.util.stream.IntStream;
-
-import static
org.apache.kafka.coordinator.group.metrics.MetricsTestUtils.assertGaugeValue;
import static org.junit.jupiter.api.Assertions.assertEquals;
public class GroupCoordinatorMetricsShardTest {
@@ -47,135 +40,18 @@ public class GroupCoordinatorMetricsShardTest {
GroupCoordinatorMetricsShard shard =
coordinatorMetrics.newMetricsShard(snapshotRegistry, tp);
shard.incrementNumOffsets();
-
shard.incrementNumConsumerGroups(ConsumerGroup.ConsumerGroupState.EMPTY);
-
shard.incrementNumConsumerGroups(ConsumerGroup.ConsumerGroupState.ASSIGNING);
-
shard.incrementNumConsumerGroups(ConsumerGroup.ConsumerGroupState.RECONCILING);
-
shard.incrementNumConsumerGroups(ConsumerGroup.ConsumerGroupState.STABLE);
-
shard.incrementNumConsumerGroups(ConsumerGroup.ConsumerGroupState.DEAD);
snapshotRegistry.idempotentCreateSnapshot(1000);
// The value should not be updated until the offset has been committed.
assertEquals(0, shard.numOffsets());
- assertEquals(0, shard.numConsumerGroups());
- assertEquals(0,
shard.numConsumerGroups(ConsumerGroup.ConsumerGroupState.EMPTY));
- assertEquals(0,
shard.numConsumerGroups(ConsumerGroup.ConsumerGroupState.ASSIGNING));
- assertEquals(0,
shard.numConsumerGroups(ConsumerGroup.ConsumerGroupState.RECONCILING));
- assertEquals(0,
shard.numConsumerGroups(ConsumerGroup.ConsumerGroupState.STABLE));
- assertEquals(0,
shard.numConsumerGroups(ConsumerGroup.ConsumerGroupState.DEAD));
shard.commitUpTo(1000);
assertEquals(1, shard.numOffsets());
- assertEquals(5, shard.numConsumerGroups());
- assertEquals(1,
shard.numConsumerGroups(ConsumerGroup.ConsumerGroupState.EMPTY));
- assertEquals(1,
shard.numConsumerGroups(ConsumerGroup.ConsumerGroupState.ASSIGNING));
- assertEquals(1,
shard.numConsumerGroups(ConsumerGroup.ConsumerGroupState.RECONCILING));
- assertEquals(1,
shard.numConsumerGroups(ConsumerGroup.ConsumerGroupState.STABLE));
- assertEquals(1,
shard.numConsumerGroups(ConsumerGroup.ConsumerGroupState.DEAD));
shard.decrementNumOffsets();
-
shard.decrementNumConsumerGroups(ConsumerGroup.ConsumerGroupState.EMPTY);
-
shard.decrementNumConsumerGroups(ConsumerGroup.ConsumerGroupState.ASSIGNING);
-
shard.decrementNumConsumerGroups(ConsumerGroup.ConsumerGroupState.RECONCILING);
-
shard.decrementNumConsumerGroups(ConsumerGroup.ConsumerGroupState.STABLE);
-
shard.decrementNumConsumerGroups(ConsumerGroup.ConsumerGroupState.DEAD);
snapshotRegistry.idempotentCreateSnapshot(2000);
shard.commitUpTo(2000);
assertEquals(0, shard.numOffsets());
- assertEquals(0, shard.numConsumerGroups());
- assertEquals(0,
shard.numConsumerGroups(ConsumerGroup.ConsumerGroupState.EMPTY));
- assertEquals(0,
shard.numConsumerGroups(ConsumerGroup.ConsumerGroupState.ASSIGNING));
- assertEquals(0,
shard.numConsumerGroups(ConsumerGroup.ConsumerGroupState.RECONCILING));
- assertEquals(0,
shard.numConsumerGroups(ConsumerGroup.ConsumerGroupState.STABLE));
- assertEquals(0,
shard.numConsumerGroups(ConsumerGroup.ConsumerGroupState.DEAD));
- }
-
- @Test
- public void testConsumerGroupStateTransitionMetrics() {
- MetricsRegistry registry = new MetricsRegistry();
- Metrics metrics = new Metrics();
- SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new
LogContext());
- TopicPartition tp = new
TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0);
- GroupCoordinatorMetrics coordinatorMetrics = new
GroupCoordinatorMetrics(registry, metrics);
- GroupCoordinatorMetricsShard shard =
coordinatorMetrics.newMetricsShard(snapshotRegistry, tp);
- coordinatorMetrics.activateMetricsShard(shard);
-
- ConsumerGroup group0 = new ConsumerGroup(
- snapshotRegistry,
- "group-0",
- shard
- );
- ConsumerGroup group1 = new ConsumerGroup(
- snapshotRegistry,
- "group-1",
- shard
- );
- ConsumerGroup group2 = new ConsumerGroup(
- snapshotRegistry,
- "group-2",
- shard
- );
- ConsumerGroup group3 = new ConsumerGroup(
- snapshotRegistry,
- "group-3",
- shard
- );
-
- IntStream.range(0, 4).forEach(__ ->
shard.incrementNumConsumerGroups(ConsumerGroup.ConsumerGroupState.EMPTY));
-
- snapshotRegistry.idempotentCreateSnapshot(1000);
- shard.commitUpTo(1000);
- assertEquals(4, shard.numConsumerGroups());
- assertEquals(4,
shard.numConsumerGroups(ConsumerGroup.ConsumerGroupState.EMPTY));
-
- ConsumerGroupMember member0 =
group0.getOrMaybeCreateMember("member-id", true);
- ConsumerGroupMember member1 =
group1.getOrMaybeCreateMember("member-id", true);
- ConsumerGroupMember member2 =
group2.getOrMaybeCreateMember("member-id", true);
- ConsumerGroupMember member3 =
group3.getOrMaybeCreateMember("member-id", true);
- group0.updateMember(member0);
- group1.updateMember(member1);
- group2.updateMember(member2);
- group3.updateMember(member3);
-
- snapshotRegistry.idempotentCreateSnapshot(2000);
- shard.commitUpTo(2000);
- assertEquals(0,
shard.numConsumerGroups(ConsumerGroup.ConsumerGroupState.EMPTY));
- assertEquals(4,
shard.numConsumerGroups(ConsumerGroup.ConsumerGroupState.STABLE));
-
- group2.setGroupEpoch(1);
- group3.setGroupEpoch(1);
-
- snapshotRegistry.idempotentCreateSnapshot(3000);
- shard.commitUpTo(3000);
- assertEquals(0,
shard.numConsumerGroups(ConsumerGroup.ConsumerGroupState.EMPTY));
- assertEquals(2,
shard.numConsumerGroups(ConsumerGroup.ConsumerGroupState.ASSIGNING));
- assertEquals(2,
shard.numConsumerGroups(ConsumerGroup.ConsumerGroupState.STABLE));
-
- group2.setTargetAssignmentEpoch(1);
-
- // Set member2 to ASSIGNING state.
- new ConsumerGroupMember.Builder(member2)
-
.setPartitionsPendingRevocation(Collections.singletonMap(Uuid.ZERO_UUID,
Collections.singleton(0)))
- .build();
-
- snapshotRegistry.idempotentCreateSnapshot(4000);
- shard.commitUpTo(4000);
- assertEquals(0,
shard.numConsumerGroups(ConsumerGroup.ConsumerGroupState.EMPTY));
- assertEquals(1,
shard.numConsumerGroups(ConsumerGroup.ConsumerGroupState.ASSIGNING));
- assertEquals(1,
shard.numConsumerGroups(ConsumerGroup.ConsumerGroupState.RECONCILING));
- assertEquals(2,
shard.numConsumerGroups(ConsumerGroup.ConsumerGroupState.STABLE));
-
- assertGaugeValue(metrics, metrics.metricName("group-count",
"group-coordinator-metrics",
- Collections.singletonMap("protocol", "consumer")), 4);
- assertGaugeValue(metrics, metrics.metricName("consumer-group-count",
"group-coordinator-metrics",
- Collections.singletonMap("state",
ConsumerGroup.ConsumerGroupState.EMPTY.toString())), 0);
- assertGaugeValue(metrics, metrics.metricName("consumer-group-count",
"group-coordinator-metrics",
- Collections.singletonMap("state",
ConsumerGroup.ConsumerGroupState.ASSIGNING.toString())), 1);
- assertGaugeValue(metrics, metrics.metricName("consumer-group-count",
"group-coordinator-metrics",
- Collections.singletonMap("state",
ConsumerGroup.ConsumerGroupState.RECONCILING.toString())), 1);
- assertGaugeValue(metrics, metrics.metricName("consumer-group-count",
"group-coordinator-metrics",
- Collections.singletonMap("state",
ConsumerGroup.ConsumerGroupState.STABLE.toString())), 2);
- assertGaugeValue(metrics, metrics.metricName("consumer-group-count",
"group-coordinator-metrics",
- Collections.singletonMap("state",
ConsumerGroup.ConsumerGroupState.DEAD.toString())), 0);
}
}
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorMetricsTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorMetricsTest.java
index d04aa533873..05da88f9f7a 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorMetricsTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorMetricsTest.java
@@ -38,6 +38,7 @@ import org.junit.jupiter.api.Test;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
+import java.util.Map;
import java.util.stream.IntStream;
import static
org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.CLASSIC_GROUP_COMPLETED_REBALANCES_SENSOR_NAME;
@@ -181,9 +182,11 @@ public class GroupCoordinatorMetricsTest {
Utils.mkEntry(ClassicGroupState.DEAD, 1L)
));
- IntStream.range(0, 5).forEach(__ ->
shard0.incrementNumConsumerGroups(ConsumerGroupState.ASSIGNING));
- IntStream.range(0, 5).forEach(__ ->
shard1.incrementNumConsumerGroups(ConsumerGroupState.RECONCILING));
- IntStream.range(0, 3).forEach(__ ->
shard1.decrementNumConsumerGroups(ConsumerGroupState.DEAD));
+
shard0.setConsumerGroupGauges(Collections.singletonMap(ConsumerGroupState.ASSIGNING,
5L));
+ shard1.setConsumerGroupGauges(Map.of(
+ ConsumerGroupState.RECONCILING, 1L,
+ ConsumerGroupState.DEAD, 1L
+ ));
IntStream.range(0, 6).forEach(__ -> shard0.incrementNumOffsets());
IntStream.range(0, 2).forEach(__ -> shard1.incrementNumOffsets());
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java
index c2e091aa354..886820e5513 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java
@@ -81,8 +81,6 @@ import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
public class ConsumerGroupTest {
@@ -426,7 +424,6 @@ public class ConsumerGroupTest {
@Test
public void testGroupState() {
- Uuid fooTopicId = Uuid.randomUuid();
ConsumerGroup consumerGroup = createConsumerGroup("foo");
assertEquals(ConsumerGroup.ConsumerGroupState.EMPTY,
consumerGroup.state());
@@ -1295,46 +1292,6 @@ public class ConsumerGroupTest {
assertEquals(expected, actual);
}
- @Test
- public void testStateTransitionMetrics() {
- // Confirm metrics is not updated when a new ConsumerGroup is created
but only when the group transitions
- // its state.
- GroupCoordinatorMetricsShard metrics =
mock(GroupCoordinatorMetricsShard.class);
- ConsumerGroup consumerGroup = new ConsumerGroup(
- new SnapshotRegistry(new LogContext()),
- "group-id",
- metrics
- );
-
- assertEquals(ConsumerGroup.ConsumerGroupState.EMPTY,
consumerGroup.state());
- verify(metrics, times(0)).onConsumerGroupStateTransition(null,
ConsumerGroup.ConsumerGroupState.EMPTY);
-
- ConsumerGroupMember member = new ConsumerGroupMember.Builder("member")
- .setMemberEpoch(1)
- .setPreviousMemberEpoch(0)
- .build();
-
- consumerGroup.updateMember(member);
-
- assertEquals(ConsumerGroup.ConsumerGroupState.RECONCILING,
consumerGroup.state());
- verify(metrics,
times(1)).onConsumerGroupStateTransition(ConsumerGroup.ConsumerGroupState.EMPTY,
ConsumerGroup.ConsumerGroupState.RECONCILING);
-
- consumerGroup.setGroupEpoch(1);
-
- assertEquals(ConsumerGroup.ConsumerGroupState.ASSIGNING,
consumerGroup.state());
- verify(metrics,
times(1)).onConsumerGroupStateTransition(ConsumerGroup.ConsumerGroupState.RECONCILING,
ConsumerGroup.ConsumerGroupState.ASSIGNING);
-
- consumerGroup.setTargetAssignmentEpoch(1);
-
- assertEquals(ConsumerGroup.ConsumerGroupState.STABLE,
consumerGroup.state());
- verify(metrics,
times(1)).onConsumerGroupStateTransition(ConsumerGroup.ConsumerGroupState.ASSIGNING,
ConsumerGroup.ConsumerGroupState.STABLE);
-
- consumerGroup.removeMember("member");
-
- assertEquals(ConsumerGroup.ConsumerGroupState.EMPTY,
consumerGroup.state());
- verify(metrics,
times(1)).onConsumerGroupStateTransition(ConsumerGroup.ConsumerGroupState.STABLE,
ConsumerGroup.ConsumerGroupState.EMPTY);
- }
-
@Test
public void testIsInStatesCaseInsensitive() {
SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new
LogContext());