This is an automated email from the ASF dual-hosted git repository.
dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new fa190cf18e6 MINOR: Only enable replay methods to modify timeline data
structure (#15528)
fa190cf18e6 is described below
commit fa190cf18e6101b8e0212b6f79bb585ccfeb0437
Author: Dongnuo Lyu <[email protected]>
AuthorDate: Fri Mar 15 08:24:59 2024 -0400
MINOR: Only enable replay methods to modify timeline data structure (#15528)
The patch prevents the main method (the method generating records) from
modifying the timeline data structure `groups` by calling
`getOrMaybeCreateConsumerGroup` in kip-848 new group coordinator. Only replay
methods are able to add the newly created group to `groups`.
Reviewers: David Jacot <[email protected]>
---
.../coordinator/group/GroupMetadataManager.java | 55 ++++++++++++++++++----
.../group/GroupMetadataManagerTest.java | 36 +++++++-------
.../group/OffsetMetadataManagerTest.java | 42 ++++++++---------
3 files changed, 85 insertions(+), 48 deletions(-)
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
index 26c2f644e96..48f0618c55d 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
@@ -583,7 +583,8 @@ public class GroupMetadataManager {
}
/**
- * Gets or maybe creates a consumer group.
+ * Gets or maybe creates a consumer group without updating the groups map.
+ * The group will be materialized during the replay.
*
* @param groupId The group id.
* @param createIfNotExists A boolean indicating whether the group should
be
@@ -605,6 +606,42 @@ public class GroupMetadataManager {
throw new GroupIdNotFoundException(String.format("Consumer group
%s not found.", groupId));
}
+ if (group == null) {
+ return new ConsumerGroup(snapshotRegistry, groupId, metrics);
+ } else {
+ if (group.type() == CONSUMER) {
+ return (ConsumerGroup) group;
+ } else {
+ // We don't support upgrading/downgrading between protocols at
the moment so
+ // we throw an exception if a group exists with the wrong type.
+ throw new GroupIdNotFoundException(String.format("Group %s is
not a consumer group.", groupId));
+ }
+ }
+ }
+
+ /**
+ * The method should be called on the replay path.
+ * Gets or maybe creates a consumer group and updates the groups map if a
new group is created.
+ *
+ * @param groupId The group id.
+ * @param createIfNotExists A boolean indicating whether the group should
be
+ * created if it does not exist.
+ *
+ * @return A ConsumerGroup.
+ * @throws IllegalStateException if the group does not exist and
createIfNotExists is false or
+ * if the group is not a consumer group.
+ * Package private for testing.
+ */
+ ConsumerGroup getOrMaybeCreatePersistedConsumerGroup(
+ String groupId,
+ boolean createIfNotExists
+ ) throws GroupIdNotFoundException {
+ Group group = groups.get(groupId);
+
+ if (group == null && !createIfNotExists) {
+ throw new IllegalStateException(String.format("Consumer group %s
not found.", groupId));
+ }
+
if (group == null) {
ConsumerGroup consumerGroup = new ConsumerGroup(snapshotRegistry,
groupId, metrics);
groups.put(groupId, consumerGroup);
@@ -616,7 +653,7 @@ public class GroupMetadataManager {
} else {
// We don't support upgrading/downgrading between protocols at
the moment so
// we throw an exception if a group exists with the wrong type.
- throw new GroupIdNotFoundException(String.format("Group %s is
not a consumer group.", groupId));
+ throw new IllegalStateException(String.format("Group %s is not
a consumer group.", groupId));
}
}
}
@@ -1551,7 +1588,7 @@ public class GroupMetadataManager {
String groupId = key.groupId();
String memberId = key.memberId();
- ConsumerGroup consumerGroup = getOrMaybeCreateConsumerGroup(groupId,
value != null);
+ ConsumerGroup consumerGroup =
getOrMaybeCreatePersistedConsumerGroup(groupId, value != null);
Set<String> oldSubscribedTopicNames = new
HashSet<>(consumerGroup.subscribedTopicNames());
if (value != null) {
@@ -1663,10 +1700,10 @@ public class GroupMetadataManager {
String groupId = key.groupId();
if (value != null) {
- ConsumerGroup consumerGroup =
getOrMaybeCreateConsumerGroup(groupId, true);
+ ConsumerGroup consumerGroup =
getOrMaybeCreatePersistedConsumerGroup(groupId, true);
consumerGroup.setGroupEpoch(value.epoch());
} else {
- ConsumerGroup consumerGroup =
getOrMaybeCreateConsumerGroup(groupId, false);
+ ConsumerGroup consumerGroup =
getOrMaybeCreatePersistedConsumerGroup(groupId, false);
if (!consumerGroup.members().isEmpty()) {
throw new IllegalStateException("Received a tombstone record
to delete group " + groupId
+ " but the group still has " +
consumerGroup.members().size() + " members.");
@@ -1698,7 +1735,7 @@ public class GroupMetadataManager {
ConsumerGroupPartitionMetadataValue value
) {
String groupId = key.groupId();
- ConsumerGroup consumerGroup = getOrMaybeCreateConsumerGroup(groupId,
false);
+ ConsumerGroup consumerGroup =
getOrMaybeCreatePersistedConsumerGroup(groupId, false);
if (value != null) {
Map<String, TopicMetadata> subscriptionMetadata = new HashMap<>();
@@ -1724,7 +1761,7 @@ public class GroupMetadataManager {
) {
String groupId = key.groupId();
String memberId = key.memberId();
- ConsumerGroup consumerGroup = getOrMaybeCreateConsumerGroup(groupId,
false);
+ ConsumerGroup consumerGroup =
getOrMaybeCreatePersistedConsumerGroup(groupId, false);
if (value != null) {
consumerGroup.updateTargetAssignment(memberId,
Assignment.fromRecord(value));
@@ -1746,7 +1783,7 @@ public class GroupMetadataManager {
ConsumerGroupTargetAssignmentMetadataValue value
) {
String groupId = key.groupId();
- ConsumerGroup consumerGroup = getOrMaybeCreateConsumerGroup(groupId,
false);
+ ConsumerGroup consumerGroup =
getOrMaybeCreatePersistedConsumerGroup(groupId, false);
if (value != null) {
consumerGroup.setTargetAssignmentEpoch(value.assignmentEpoch());
@@ -1772,7 +1809,7 @@ public class GroupMetadataManager {
) {
String groupId = key.groupId();
String memberId = key.memberId();
- ConsumerGroup consumerGroup = getOrMaybeCreateConsumerGroup(groupId,
false);
+ ConsumerGroup consumerGroup =
getOrMaybeCreatePersistedConsumerGroup(groupId, false);
ConsumerGroupMember oldMember =
consumerGroup.getOrMaybeCreateMember(memberId, false);
if (value != null) {
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
index 66592417828..e9304407cdd 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
@@ -9112,43 +9112,43 @@ public class GroupMetadataManagerTest {
@Test
public void testConsumerGroupDelete() {
+ String groupId = "group-id";
GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+ .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10))
.build();
- context.groupMetadataManager.getOrMaybeCreateConsumerGroup("group-id",
true);
List<Record> expectedRecords = Arrays.asList(
- RecordHelpers.newTargetAssignmentEpochTombstoneRecord("group-id"),
-
RecordHelpers.newGroupSubscriptionMetadataTombstoneRecord("group-id"),
- RecordHelpers.newGroupEpochTombstoneRecord("group-id")
+ RecordHelpers.newTargetAssignmentEpochTombstoneRecord(groupId),
+ RecordHelpers.newGroupSubscriptionMetadataTombstoneRecord(groupId),
+ RecordHelpers.newGroupEpochTombstoneRecord(groupId)
);
List<Record> records = new ArrayList<>();
- context.groupMetadataManager.deleteGroup("group-id", records);
+ context.groupMetadataManager.deleteGroup(groupId, records);
assertEquals(expectedRecords, records);
}
@Test
public void testConsumerGroupMaybeDelete() {
+ String groupId = "group-id";
GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+ .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10))
.build();
- ConsumerGroup group =
context.groupMetadataManager.getOrMaybeCreateConsumerGroup("group-id", true);
List<Record> expectedRecords = Arrays.asList(
- RecordHelpers.newTargetAssignmentEpochTombstoneRecord("group-id"),
-
RecordHelpers.newGroupSubscriptionMetadataTombstoneRecord("group-id"),
- RecordHelpers.newGroupEpochTombstoneRecord("group-id")
+ RecordHelpers.newTargetAssignmentEpochTombstoneRecord(groupId),
+ RecordHelpers.newGroupSubscriptionMetadataTombstoneRecord(groupId),
+ RecordHelpers.newGroupEpochTombstoneRecord(groupId)
);
List<Record> records = new ArrayList<>();
- context.groupMetadataManager.maybeDeleteGroup("group-id", records);
+ context.groupMetadataManager.maybeDeleteGroup(groupId, records);
assertEquals(expectedRecords, records);
records = new ArrayList<>();
- group.updateMember(new ConsumerGroupMember.Builder("member")
- .setState(MemberState.STABLE)
+ context.replay(RecordHelpers.newMemberSubscriptionRecord(groupId, new
ConsumerGroupMember.Builder("member")
.setMemberEpoch(10)
.setPreviousMemberEpoch(10)
- .build()
- );
- context.groupMetadataManager.maybeDeleteGroup("group-id", records);
+ .build()));
+ context.groupMetadataManager.maybeDeleteGroup(groupId, records);
assertEquals(Collections.emptyList(), records);
}
@@ -9273,7 +9273,7 @@ public class GroupMetadataManagerTest {
verify(context.metrics,
times(1)).onConsumerGroupStateTransition(ConsumerGroup.ConsumerGroupState.EMPTY,
null);
// Replaying a tombstone for a group that has already been removed
should not decrement metric.
- tombstones.forEach(tombstone ->
assertThrows(GroupIdNotFoundException.class, () -> context.replay(tombstone)));
+ tombstones.forEach(tombstone ->
assertThrows(IllegalStateException.class, () -> context.replay(tombstone)));
verify(context.metrics,
times(1)).onConsumerGroupStateTransition(ConsumerGroup.ConsumerGroupState.EMPTY,
null);
}
@@ -9290,8 +9290,8 @@ public class GroupMetadataManagerTest {
context.replay(RecordHelpers.newTargetAssignmentEpochTombstoneRecord("group-id"));
context.replay(RecordHelpers.newGroupEpochTombstoneRecord("group-id"));
IntStream.range(0, 3).forEach(__ -> {
- assertThrows(GroupIdNotFoundException.class, () ->
context.replay(RecordHelpers.newTargetAssignmentEpochTombstoneRecord("group-id")));
- assertThrows(GroupIdNotFoundException.class, () ->
context.replay(RecordHelpers.newGroupEpochTombstoneRecord("group-id")));
+ assertThrows(IllegalStateException.class, () ->
context.replay(RecordHelpers.newTargetAssignmentEpochTombstoneRecord("group-id")));
+ assertThrows(IllegalStateException.class, () ->
context.replay(RecordHelpers.newGroupEpochTombstoneRecord("group-id")));
});
verify(context.metrics, times(1)).onConsumerGroupStateTransition(null,
ConsumerGroup.ConsumerGroupState.EMPTY);
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java
index d9eacb46c3e..4b3d76bbd1c 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java
@@ -194,7 +194,7 @@ public class OffsetMetadataManagerTest {
true
);
case CONSUMER:
- return groupMetadataManager.getOrMaybeCreateConsumerGroup(
+ return
groupMetadataManager.getOrMaybeCreatePersistedConsumerGroup(
groupId,
true
);
@@ -1079,7 +1079,7 @@ public class OffsetMetadataManagerTest {
OffsetMetadataManagerTestContext context = new
OffsetMetadataManagerTestContext.Builder().build();
// Create an empty group.
- context.groupMetadataManager.getOrMaybeCreateConsumerGroup(
+ context.groupMetadataManager.getOrMaybeCreatePersistedConsumerGroup(
"foo",
true
);
@@ -1108,7 +1108,7 @@ public class OffsetMetadataManagerTest {
OffsetMetadataManagerTestContext context = new
OffsetMetadataManagerTestContext.Builder().build();
// Create an empty group.
- ConsumerGroup group =
context.groupMetadataManager.getOrMaybeCreateConsumerGroup(
+ ConsumerGroup group =
context.groupMetadataManager.getOrMaybeCreatePersistedConsumerGroup(
"foo",
true
);
@@ -1153,7 +1153,7 @@ public class OffsetMetadataManagerTest {
OffsetMetadataManagerTestContext context = new
OffsetMetadataManagerTestContext.Builder().build();
// Create an empty group.
- ConsumerGroup group =
context.groupMetadataManager.getOrMaybeCreateConsumerGroup(
+ ConsumerGroup group =
context.groupMetadataManager.getOrMaybeCreatePersistedConsumerGroup(
"foo",
true
);
@@ -1190,7 +1190,7 @@ public class OffsetMetadataManagerTest {
OffsetMetadataManagerTestContext context = new
OffsetMetadataManagerTestContext.Builder().build();
// Create an empty group.
- context.groupMetadataManager.getOrMaybeCreateConsumerGroup(
+ context.groupMetadataManager.getOrMaybeCreatePersistedConsumerGroup(
"foo",
true
);
@@ -1246,7 +1246,7 @@ public class OffsetMetadataManagerTest {
OffsetMetadataManagerTestContext context = new
OffsetMetadataManagerTestContext.Builder().build();
// Create an empty group.
- ConsumerGroup group =
context.groupMetadataManager.getOrMaybeCreateConsumerGroup(
+ ConsumerGroup group =
context.groupMetadataManager.getOrMaybeCreatePersistedConsumerGroup(
"foo",
true
);
@@ -1316,7 +1316,7 @@ public class OffsetMetadataManagerTest {
.build();
// Create an empty group.
- ConsumerGroup group =
context.groupMetadataManager.getOrMaybeCreateConsumerGroup(
+ ConsumerGroup group =
context.groupMetadataManager.getOrMaybeCreatePersistedConsumerGroup(
"foo",
true
);
@@ -1393,7 +1393,7 @@ public class OffsetMetadataManagerTest {
OffsetMetadataManagerTestContext context = new
OffsetMetadataManagerTestContext.Builder().build();
// Create an empty group.
- ConsumerGroup group =
context.groupMetadataManager.getOrMaybeCreateConsumerGroup(
+ ConsumerGroup group =
context.groupMetadataManager.getOrMaybeCreatePersistedConsumerGroup(
"foo",
true
);
@@ -1483,7 +1483,7 @@ public class OffsetMetadataManagerTest {
OffsetMetadataManagerTestContext context = new
OffsetMetadataManagerTestContext.Builder().build();
// Create an empty group.
- context.groupMetadataManager.getOrMaybeCreateConsumerGroup(
+ context.groupMetadataManager.getOrMaybeCreatePersistedConsumerGroup(
"foo",
true
);
@@ -1512,7 +1512,7 @@ public class OffsetMetadataManagerTest {
OffsetMetadataManagerTestContext context = new
OffsetMetadataManagerTestContext.Builder().build();
// Create an empty group.
- ConsumerGroup group =
context.groupMetadataManager.getOrMaybeCreateConsumerGroup(
+ ConsumerGroup group =
context.groupMetadataManager.getOrMaybeCreatePersistedConsumerGroup(
"foo",
true
);
@@ -1775,7 +1775,7 @@ public class OffsetMetadataManagerTest {
public void testFetchOffsetsAtDifferentCommittedOffset() {
OffsetMetadataManagerTestContext context = new
OffsetMetadataManagerTestContext.Builder().build();
- context.groupMetadataManager.getOrMaybeCreateConsumerGroup("group",
true);
+
context.groupMetadataManager.getOrMaybeCreatePersistedConsumerGroup("group",
true);
assertEquals(0, context.lastWrittenOffset);
context.commitOffset("group", "foo", 0, 100L, 1);
@@ -1916,7 +1916,7 @@ public class OffsetMetadataManagerTest {
public void testFetchOffsetsWithPendingTransactionalOffsets() {
OffsetMetadataManagerTestContext context = new
OffsetMetadataManagerTestContext.Builder().build();
- context.groupMetadataManager.getOrMaybeCreateConsumerGroup("group",
true);
+
context.groupMetadataManager.getOrMaybeCreatePersistedConsumerGroup("group",
true);
context.commitOffset("group", "foo", 0, 100L, 1);
context.commitOffset("group", "foo", 1, 110L, 1);
@@ -2021,7 +2021,7 @@ public class OffsetMetadataManagerTest {
public void testFetchAllOffsetsAtDifferentCommittedOffset() {
OffsetMetadataManagerTestContext context = new
OffsetMetadataManagerTestContext.Builder().build();
- context.groupMetadataManager.getOrMaybeCreateConsumerGroup("group",
true);
+
context.groupMetadataManager.getOrMaybeCreatePersistedConsumerGroup("group",
true);
assertEquals(0, context.lastWrittenOffset);
context.commitOffset("group", "foo", 0, 100L, 1);
@@ -2108,7 +2108,7 @@ public class OffsetMetadataManagerTest {
public void testFetchAllOffsetsWithPendingTransactionalOffsets() {
OffsetMetadataManagerTestContext context = new
OffsetMetadataManagerTestContext.Builder().build();
- context.groupMetadataManager.getOrMaybeCreateConsumerGroup("group",
true);
+
context.groupMetadataManager.getOrMaybeCreatePersistedConsumerGroup("group",
true);
context.commitOffset("group", "foo", 0, 100L, 1);
context.commitOffset("group", "foo", 1, 110L, 1);
@@ -2182,7 +2182,7 @@ public class OffsetMetadataManagerTest {
public void testConsumerGroupOffsetFetchWithMemberIdAndEpoch() {
OffsetMetadataManagerTestContext context = new
OffsetMetadataManagerTestContext.Builder().build();
// Create consumer group.
- ConsumerGroup group =
context.groupMetadataManager.getOrMaybeCreateConsumerGroup("group", true);
+ ConsumerGroup group =
context.groupMetadataManager.getOrMaybeCreatePersistedConsumerGroup("group",
true);
// Create member.
group.getOrMaybeCreateMember("member", true);
// Commit offset.
@@ -2217,7 +2217,7 @@ public class OffsetMetadataManagerTest {
public void testConsumerGroupOffsetFetchFromAdminClient() {
OffsetMetadataManagerTestContext context = new
OffsetMetadataManagerTestContext.Builder().build();
// Create consumer group.
- ConsumerGroup group =
context.groupMetadataManager.getOrMaybeCreateConsumerGroup("group", true);
+ ConsumerGroup group =
context.groupMetadataManager.getOrMaybeCreatePersistedConsumerGroup("group",
true);
// Create member.
group.getOrMaybeCreateMember("member", true);
// Commit offset.
@@ -2251,7 +2251,7 @@ public class OffsetMetadataManagerTest {
@Test
public void testConsumerGroupOffsetFetchWithUnknownMemberId() {
OffsetMetadataManagerTestContext context = new
OffsetMetadataManagerTestContext.Builder().build();
- context.groupMetadataManager.getOrMaybeCreateConsumerGroup("group",
true);
+
context.groupMetadataManager.getOrMaybeCreatePersistedConsumerGroup("group",
true);
// Fetch offsets case.
List<OffsetFetchRequestData.OffsetFetchRequestTopics> topics =
Collections.singletonList(
@@ -2276,7 +2276,7 @@ public class OffsetMetadataManagerTest {
@Test
public void testConsumerGroupOffsetFetchWithStaleMemberEpoch() {
OffsetMetadataManagerTestContext context = new
OffsetMetadataManagerTestContext.Builder().build();
- ConsumerGroup group =
context.groupMetadataManager.getOrMaybeCreateConsumerGroup("group", true);
+ ConsumerGroup group =
context.groupMetadataManager.getOrMaybeCreatePersistedConsumerGroup("group",
true);
group.getOrMaybeCreateMember("member", true);
// Fetch offsets case.
@@ -2340,7 +2340,7 @@ public class OffsetMetadataManagerTest {
@Test
public void testConsumerGroupOffsetDelete() {
OffsetMetadataManagerTestContext context = new
OffsetMetadataManagerTestContext.Builder().build();
- ConsumerGroup group =
context.groupMetadataManager.getOrMaybeCreateConsumerGroup(
+ ConsumerGroup group =
context.groupMetadataManager.getOrMaybeCreatePersistedConsumerGroup(
"foo",
true
);
@@ -2352,7 +2352,7 @@ public class OffsetMetadataManagerTest {
@Test
public void testConsumerGroupOffsetDeleteWithErrors() {
OffsetMetadataManagerTestContext context = new
OffsetMetadataManagerTestContext.Builder().build();
- ConsumerGroup group =
context.groupMetadataManager.getOrMaybeCreateConsumerGroup(
+ ConsumerGroup group =
context.groupMetadataManager.getOrMaybeCreatePersistedConsumerGroup(
"foo",
true
);
@@ -2382,7 +2382,7 @@ public class OffsetMetadataManagerTest {
@Test
public void testConsumerGroupOffsetDeleteWithPendingTransactionalOffsets()
{
OffsetMetadataManagerTestContext context = new
OffsetMetadataManagerTestContext.Builder().build();
- ConsumerGroup group =
context.groupMetadataManager.getOrMaybeCreateConsumerGroup(
+ ConsumerGroup group =
context.groupMetadataManager.getOrMaybeCreatePersistedConsumerGroup(
"foo",
true
);