This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new fa190cf18e6 MINOR: Only enable replay methods to modify timeline data 
structure (#15528)
fa190cf18e6 is described below

commit fa190cf18e6101b8e0212b6f79bb585ccfeb0437
Author: Dongnuo Lyu <[email protected]>
AuthorDate: Fri Mar 15 08:24:59 2024 -0400

    MINOR: Only enable replay methods to modify timeline data structure (#15528)
    
    The patch prevents the main method (the method generating records) from 
modifying the timeline data structure `groups`  by calling 
`getOrMaybeCreateConsumerGroup` in kip-848 new group coordinator. Only replay 
methods are able to add the newly created group to `groups`.
    
    Reviewers: David Jacot <[email protected]>
---
 .../coordinator/group/GroupMetadataManager.java    | 55 ++++++++++++++++++----
 .../group/GroupMetadataManagerTest.java            | 36 +++++++-------
 .../group/OffsetMetadataManagerTest.java           | 42 ++++++++---------
 3 files changed, 85 insertions(+), 48 deletions(-)

diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
index 26c2f644e96..48f0618c55d 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
@@ -583,7 +583,8 @@ public class GroupMetadataManager {
     }
 
     /**
-     * Gets or maybe creates a consumer group.
+     * Gets or maybe creates a consumer group without updating the groups map.
+     * The group will be materialized during the replay.
      *
      * @param groupId           The group id.
      * @param createIfNotExists A boolean indicating whether the group should 
be
@@ -605,6 +606,42 @@ public class GroupMetadataManager {
             throw new GroupIdNotFoundException(String.format("Consumer group 
%s not found.", groupId));
         }
 
+        if (group == null) {
+            return new ConsumerGroup(snapshotRegistry, groupId, metrics);
+        } else {
+            if (group.type() == CONSUMER) {
+                return (ConsumerGroup) group;
+            } else {
+                // We don't support upgrading/downgrading between protocols at 
the moment so
+                // we throw an exception if a group exists with the wrong type.
+                throw new GroupIdNotFoundException(String.format("Group %s is 
not a consumer group.", groupId));
+            }
+        }
+    }
+
+    /**
+     * The method should be called on the replay path.
+     * Gets or maybe creates a consumer group and updates the groups map if a 
new group is created.
+     *
+     * @param groupId           The group id.
+     * @param createIfNotExists A boolean indicating whether the group should 
be
+     *                          created if it does not exist.
+     *
+     * @return A ConsumerGroup.
+     * @throws IllegalStateException if the group does not exist and 
createIfNotExists is false or
+     *                               if the group is not a consumer group.
+     * Package private for testing.
+     */
+    ConsumerGroup getOrMaybeCreatePersistedConsumerGroup(
+        String groupId,
+        boolean createIfNotExists
+    ) throws GroupIdNotFoundException {
+        Group group = groups.get(groupId);
+
+        if (group == null && !createIfNotExists) {
+            throw new IllegalStateException(String.format("Consumer group %s 
not found.", groupId));
+        }
+
         if (group == null) {
             ConsumerGroup consumerGroup = new ConsumerGroup(snapshotRegistry, 
groupId, metrics);
             groups.put(groupId, consumerGroup);
@@ -616,7 +653,7 @@ public class GroupMetadataManager {
             } else {
                 // We don't support upgrading/downgrading between protocols at 
the moment so
                 // we throw an exception if a group exists with the wrong type.
-                throw new GroupIdNotFoundException(String.format("Group %s is 
not a consumer group.", groupId));
+                throw new IllegalStateException(String.format("Group %s is not 
a consumer group.", groupId));
             }
         }
     }
@@ -1551,7 +1588,7 @@ public class GroupMetadataManager {
         String groupId = key.groupId();
         String memberId = key.memberId();
 
-        ConsumerGroup consumerGroup = getOrMaybeCreateConsumerGroup(groupId, 
value != null);
+        ConsumerGroup consumerGroup = 
getOrMaybeCreatePersistedConsumerGroup(groupId, value != null);
         Set<String> oldSubscribedTopicNames = new 
HashSet<>(consumerGroup.subscribedTopicNames());
 
         if (value != null) {
@@ -1663,10 +1700,10 @@ public class GroupMetadataManager {
         String groupId = key.groupId();
 
         if (value != null) {
-            ConsumerGroup consumerGroup = 
getOrMaybeCreateConsumerGroup(groupId, true);
+            ConsumerGroup consumerGroup = 
getOrMaybeCreatePersistedConsumerGroup(groupId, true);
             consumerGroup.setGroupEpoch(value.epoch());
         } else {
-            ConsumerGroup consumerGroup = 
getOrMaybeCreateConsumerGroup(groupId, false);
+            ConsumerGroup consumerGroup = 
getOrMaybeCreatePersistedConsumerGroup(groupId, false);
             if (!consumerGroup.members().isEmpty()) {
                 throw new IllegalStateException("Received a tombstone record 
to delete group " + groupId
                     + " but the group still has " + 
consumerGroup.members().size() + " members.");
@@ -1698,7 +1735,7 @@ public class GroupMetadataManager {
         ConsumerGroupPartitionMetadataValue value
     ) {
         String groupId = key.groupId();
-        ConsumerGroup consumerGroup = getOrMaybeCreateConsumerGroup(groupId, 
false);
+        ConsumerGroup consumerGroup = 
getOrMaybeCreatePersistedConsumerGroup(groupId, false);
 
         if (value != null) {
             Map<String, TopicMetadata> subscriptionMetadata = new HashMap<>();
@@ -1724,7 +1761,7 @@ public class GroupMetadataManager {
     ) {
         String groupId = key.groupId();
         String memberId = key.memberId();
-        ConsumerGroup consumerGroup = getOrMaybeCreateConsumerGroup(groupId, 
false);
+        ConsumerGroup consumerGroup = 
getOrMaybeCreatePersistedConsumerGroup(groupId, false);
 
         if (value != null) {
             consumerGroup.updateTargetAssignment(memberId, 
Assignment.fromRecord(value));
@@ -1746,7 +1783,7 @@ public class GroupMetadataManager {
         ConsumerGroupTargetAssignmentMetadataValue value
     ) {
         String groupId = key.groupId();
-        ConsumerGroup consumerGroup = getOrMaybeCreateConsumerGroup(groupId, 
false);
+        ConsumerGroup consumerGroup = 
getOrMaybeCreatePersistedConsumerGroup(groupId, false);
 
         if (value != null) {
             consumerGroup.setTargetAssignmentEpoch(value.assignmentEpoch());
@@ -1772,7 +1809,7 @@ public class GroupMetadataManager {
     ) {
         String groupId = key.groupId();
         String memberId = key.memberId();
-        ConsumerGroup consumerGroup = getOrMaybeCreateConsumerGroup(groupId, 
false);
+        ConsumerGroup consumerGroup = 
getOrMaybeCreatePersistedConsumerGroup(groupId, false);
         ConsumerGroupMember oldMember = 
consumerGroup.getOrMaybeCreateMember(memberId, false);
 
         if (value != null) {
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
index 66592417828..e9304407cdd 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
@@ -9112,43 +9112,43 @@ public class GroupMetadataManagerTest {
 
     @Test
     public void testConsumerGroupDelete() {
+        String groupId = "group-id";
         GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10))
             .build();
-        context.groupMetadataManager.getOrMaybeCreateConsumerGroup("group-id", 
true);
 
         List<Record> expectedRecords = Arrays.asList(
-            RecordHelpers.newTargetAssignmentEpochTombstoneRecord("group-id"),
-            
RecordHelpers.newGroupSubscriptionMetadataTombstoneRecord("group-id"),
-            RecordHelpers.newGroupEpochTombstoneRecord("group-id")
+            RecordHelpers.newTargetAssignmentEpochTombstoneRecord(groupId),
+            RecordHelpers.newGroupSubscriptionMetadataTombstoneRecord(groupId),
+            RecordHelpers.newGroupEpochTombstoneRecord(groupId)
         );
         List<Record> records = new ArrayList<>();
-        context.groupMetadataManager.deleteGroup("group-id", records);
+        context.groupMetadataManager.deleteGroup(groupId, records);
         assertEquals(expectedRecords, records);
     }
 
     @Test
     public void testConsumerGroupMaybeDelete() {
+        String groupId = "group-id";
         GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10))
             .build();
-        ConsumerGroup group = 
context.groupMetadataManager.getOrMaybeCreateConsumerGroup("group-id", true);
 
         List<Record> expectedRecords = Arrays.asList(
-            RecordHelpers.newTargetAssignmentEpochTombstoneRecord("group-id"),
-            
RecordHelpers.newGroupSubscriptionMetadataTombstoneRecord("group-id"),
-            RecordHelpers.newGroupEpochTombstoneRecord("group-id")
+            RecordHelpers.newTargetAssignmentEpochTombstoneRecord(groupId),
+            RecordHelpers.newGroupSubscriptionMetadataTombstoneRecord(groupId),
+            RecordHelpers.newGroupEpochTombstoneRecord(groupId)
         );
         List<Record> records = new ArrayList<>();
-        context.groupMetadataManager.maybeDeleteGroup("group-id", records);
+        context.groupMetadataManager.maybeDeleteGroup(groupId, records);
         assertEquals(expectedRecords, records);
 
         records = new ArrayList<>();
-        group.updateMember(new ConsumerGroupMember.Builder("member")
-            .setState(MemberState.STABLE)
+        context.replay(RecordHelpers.newMemberSubscriptionRecord(groupId, new 
ConsumerGroupMember.Builder("member")
             .setMemberEpoch(10)
             .setPreviousMemberEpoch(10)
-            .build()
-        );
-        context.groupMetadataManager.maybeDeleteGroup("group-id", records);
+            .build()));
+        context.groupMetadataManager.maybeDeleteGroup(groupId, records);
         assertEquals(Collections.emptyList(), records);
     }
 
@@ -9273,7 +9273,7 @@ public class GroupMetadataManagerTest {
         verify(context.metrics, 
times(1)).onConsumerGroupStateTransition(ConsumerGroup.ConsumerGroupState.EMPTY,
 null);
 
         // Replaying a tombstone for a group that has already been removed 
should not decrement metric.
-        tombstones.forEach(tombstone -> 
assertThrows(GroupIdNotFoundException.class, () -> context.replay(tombstone)));
+        tombstones.forEach(tombstone -> 
assertThrows(IllegalStateException.class, () -> context.replay(tombstone)));
         verify(context.metrics, 
times(1)).onConsumerGroupStateTransition(ConsumerGroup.ConsumerGroupState.EMPTY,
 null);
     }
 
@@ -9290,8 +9290,8 @@ public class GroupMetadataManagerTest {
         
context.replay(RecordHelpers.newTargetAssignmentEpochTombstoneRecord("group-id"));
         context.replay(RecordHelpers.newGroupEpochTombstoneRecord("group-id"));
         IntStream.range(0, 3).forEach(__ -> {
-            assertThrows(GroupIdNotFoundException.class, () -> 
context.replay(RecordHelpers.newTargetAssignmentEpochTombstoneRecord("group-id")));
-            assertThrows(GroupIdNotFoundException.class, () -> 
context.replay(RecordHelpers.newGroupEpochTombstoneRecord("group-id")));
+            assertThrows(IllegalStateException.class, () -> 
context.replay(RecordHelpers.newTargetAssignmentEpochTombstoneRecord("group-id")));
+            assertThrows(IllegalStateException.class, () -> 
context.replay(RecordHelpers.newGroupEpochTombstoneRecord("group-id")));
         });
 
         verify(context.metrics, times(1)).onConsumerGroupStateTransition(null, 
ConsumerGroup.ConsumerGroupState.EMPTY);
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java
index d9eacb46c3e..4b3d76bbd1c 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java
@@ -194,7 +194,7 @@ public class OffsetMetadataManagerTest {
                         true
                     );
                 case CONSUMER:
-                    return groupMetadataManager.getOrMaybeCreateConsumerGroup(
+                    return 
groupMetadataManager.getOrMaybeCreatePersistedConsumerGroup(
                         groupId,
                         true
                     );
@@ -1079,7 +1079,7 @@ public class OffsetMetadataManagerTest {
         OffsetMetadataManagerTestContext context = new 
OffsetMetadataManagerTestContext.Builder().build();
 
         // Create an empty group.
-        context.groupMetadataManager.getOrMaybeCreateConsumerGroup(
+        context.groupMetadataManager.getOrMaybeCreatePersistedConsumerGroup(
             "foo",
             true
         );
@@ -1108,7 +1108,7 @@ public class OffsetMetadataManagerTest {
         OffsetMetadataManagerTestContext context = new 
OffsetMetadataManagerTestContext.Builder().build();
 
         // Create an empty group.
-        ConsumerGroup group = 
context.groupMetadataManager.getOrMaybeCreateConsumerGroup(
+        ConsumerGroup group = 
context.groupMetadataManager.getOrMaybeCreatePersistedConsumerGroup(
             "foo",
             true
         );
@@ -1153,7 +1153,7 @@ public class OffsetMetadataManagerTest {
         OffsetMetadataManagerTestContext context = new 
OffsetMetadataManagerTestContext.Builder().build();
 
         // Create an empty group.
-        ConsumerGroup group = 
context.groupMetadataManager.getOrMaybeCreateConsumerGroup(
+        ConsumerGroup group = 
context.groupMetadataManager.getOrMaybeCreatePersistedConsumerGroup(
             "foo",
             true
         );
@@ -1190,7 +1190,7 @@ public class OffsetMetadataManagerTest {
         OffsetMetadataManagerTestContext context = new 
OffsetMetadataManagerTestContext.Builder().build();
 
         // Create an empty group.
-        context.groupMetadataManager.getOrMaybeCreateConsumerGroup(
+        context.groupMetadataManager.getOrMaybeCreatePersistedConsumerGroup(
             "foo",
             true
         );
@@ -1246,7 +1246,7 @@ public class OffsetMetadataManagerTest {
         OffsetMetadataManagerTestContext context = new 
OffsetMetadataManagerTestContext.Builder().build();
 
         // Create an empty group.
-        ConsumerGroup group = 
context.groupMetadataManager.getOrMaybeCreateConsumerGroup(
+        ConsumerGroup group = 
context.groupMetadataManager.getOrMaybeCreatePersistedConsumerGroup(
             "foo",
             true
         );
@@ -1316,7 +1316,7 @@ public class OffsetMetadataManagerTest {
             .build();
 
         // Create an empty group.
-        ConsumerGroup group = 
context.groupMetadataManager.getOrMaybeCreateConsumerGroup(
+        ConsumerGroup group = 
context.groupMetadataManager.getOrMaybeCreatePersistedConsumerGroup(
             "foo",
             true
         );
@@ -1393,7 +1393,7 @@ public class OffsetMetadataManagerTest {
         OffsetMetadataManagerTestContext context = new 
OffsetMetadataManagerTestContext.Builder().build();
 
         // Create an empty group.
-        ConsumerGroup group = 
context.groupMetadataManager.getOrMaybeCreateConsumerGroup(
+        ConsumerGroup group = 
context.groupMetadataManager.getOrMaybeCreatePersistedConsumerGroup(
             "foo",
             true
         );
@@ -1483,7 +1483,7 @@ public class OffsetMetadataManagerTest {
         OffsetMetadataManagerTestContext context = new 
OffsetMetadataManagerTestContext.Builder().build();
 
         // Create an empty group.
-        context.groupMetadataManager.getOrMaybeCreateConsumerGroup(
+        context.groupMetadataManager.getOrMaybeCreatePersistedConsumerGroup(
             "foo",
             true
         );
@@ -1512,7 +1512,7 @@ public class OffsetMetadataManagerTest {
         OffsetMetadataManagerTestContext context = new 
OffsetMetadataManagerTestContext.Builder().build();
 
         // Create an empty group.
-        ConsumerGroup group = 
context.groupMetadataManager.getOrMaybeCreateConsumerGroup(
+        ConsumerGroup group = 
context.groupMetadataManager.getOrMaybeCreatePersistedConsumerGroup(
             "foo",
             true
         );
@@ -1775,7 +1775,7 @@ public class OffsetMetadataManagerTest {
     public void testFetchOffsetsAtDifferentCommittedOffset() {
         OffsetMetadataManagerTestContext context = new 
OffsetMetadataManagerTestContext.Builder().build();
 
-        context.groupMetadataManager.getOrMaybeCreateConsumerGroup("group", 
true);
+        
context.groupMetadataManager.getOrMaybeCreatePersistedConsumerGroup("group", 
true);
 
         assertEquals(0, context.lastWrittenOffset);
         context.commitOffset("group", "foo", 0, 100L, 1);
@@ -1916,7 +1916,7 @@ public class OffsetMetadataManagerTest {
     public void testFetchOffsetsWithPendingTransactionalOffsets() {
         OffsetMetadataManagerTestContext context = new 
OffsetMetadataManagerTestContext.Builder().build();
 
-        context.groupMetadataManager.getOrMaybeCreateConsumerGroup("group", 
true);
+        
context.groupMetadataManager.getOrMaybeCreatePersistedConsumerGroup("group", 
true);
 
         context.commitOffset("group", "foo", 0, 100L, 1);
         context.commitOffset("group", "foo", 1, 110L, 1);
@@ -2021,7 +2021,7 @@ public class OffsetMetadataManagerTest {
     public void testFetchAllOffsetsAtDifferentCommittedOffset() {
         OffsetMetadataManagerTestContext context = new 
OffsetMetadataManagerTestContext.Builder().build();
 
-        context.groupMetadataManager.getOrMaybeCreateConsumerGroup("group", 
true);
+        
context.groupMetadataManager.getOrMaybeCreatePersistedConsumerGroup("group", 
true);
 
         assertEquals(0, context.lastWrittenOffset);
         context.commitOffset("group", "foo", 0, 100L, 1);
@@ -2108,7 +2108,7 @@ public class OffsetMetadataManagerTest {
     public void testFetchAllOffsetsWithPendingTransactionalOffsets() {
         OffsetMetadataManagerTestContext context = new 
OffsetMetadataManagerTestContext.Builder().build();
 
-        context.groupMetadataManager.getOrMaybeCreateConsumerGroup("group", 
true);
+        
context.groupMetadataManager.getOrMaybeCreatePersistedConsumerGroup("group", 
true);
 
         context.commitOffset("group", "foo", 0, 100L, 1);
         context.commitOffset("group", "foo", 1, 110L, 1);
@@ -2182,7 +2182,7 @@ public class OffsetMetadataManagerTest {
     public void testConsumerGroupOffsetFetchWithMemberIdAndEpoch() {
         OffsetMetadataManagerTestContext context = new 
OffsetMetadataManagerTestContext.Builder().build();
         // Create consumer group.
-        ConsumerGroup group = 
context.groupMetadataManager.getOrMaybeCreateConsumerGroup("group", true);
+        ConsumerGroup group = 
context.groupMetadataManager.getOrMaybeCreatePersistedConsumerGroup("group", 
true);
         // Create member.
         group.getOrMaybeCreateMember("member", true);
         // Commit offset.
@@ -2217,7 +2217,7 @@ public class OffsetMetadataManagerTest {
     public void testConsumerGroupOffsetFetchFromAdminClient() {
         OffsetMetadataManagerTestContext context = new 
OffsetMetadataManagerTestContext.Builder().build();
         // Create consumer group.
-        ConsumerGroup group = 
context.groupMetadataManager.getOrMaybeCreateConsumerGroup("group", true);
+        ConsumerGroup group = 
context.groupMetadataManager.getOrMaybeCreatePersistedConsumerGroup("group", 
true);
         // Create member.
         group.getOrMaybeCreateMember("member", true);
         // Commit offset.
@@ -2251,7 +2251,7 @@ public class OffsetMetadataManagerTest {
     @Test
     public void testConsumerGroupOffsetFetchWithUnknownMemberId() {
         OffsetMetadataManagerTestContext context = new 
OffsetMetadataManagerTestContext.Builder().build();
-        context.groupMetadataManager.getOrMaybeCreateConsumerGroup("group", 
true);
+        
context.groupMetadataManager.getOrMaybeCreatePersistedConsumerGroup("group", 
true);
 
         // Fetch offsets case.
         List<OffsetFetchRequestData.OffsetFetchRequestTopics> topics = 
Collections.singletonList(
@@ -2276,7 +2276,7 @@ public class OffsetMetadataManagerTest {
     @Test
     public void testConsumerGroupOffsetFetchWithStaleMemberEpoch() {
         OffsetMetadataManagerTestContext context = new 
OffsetMetadataManagerTestContext.Builder().build();
-        ConsumerGroup group = 
context.groupMetadataManager.getOrMaybeCreateConsumerGroup("group", true);
+        ConsumerGroup group = 
context.groupMetadataManager.getOrMaybeCreatePersistedConsumerGroup("group", 
true);
         group.getOrMaybeCreateMember("member", true);
 
         // Fetch offsets case.
@@ -2340,7 +2340,7 @@ public class OffsetMetadataManagerTest {
     @Test
     public void testConsumerGroupOffsetDelete() {
         OffsetMetadataManagerTestContext context = new 
OffsetMetadataManagerTestContext.Builder().build();
-        ConsumerGroup group = 
context.groupMetadataManager.getOrMaybeCreateConsumerGroup(
+        ConsumerGroup group = 
context.groupMetadataManager.getOrMaybeCreatePersistedConsumerGroup(
             "foo",
             true
         );
@@ -2352,7 +2352,7 @@ public class OffsetMetadataManagerTest {
     @Test
     public void testConsumerGroupOffsetDeleteWithErrors() {
         OffsetMetadataManagerTestContext context = new 
OffsetMetadataManagerTestContext.Builder().build();
-        ConsumerGroup group = 
context.groupMetadataManager.getOrMaybeCreateConsumerGroup(
+        ConsumerGroup group = 
context.groupMetadataManager.getOrMaybeCreatePersistedConsumerGroup(
             "foo",
             true
         );
@@ -2382,7 +2382,7 @@ public class OffsetMetadataManagerTest {
     @Test
     public void testConsumerGroupOffsetDeleteWithPendingTransactionalOffsets() 
{
         OffsetMetadataManagerTestContext context = new 
OffsetMetadataManagerTestContext.Builder().build();
-        ConsumerGroup group = 
context.groupMetadataManager.getOrMaybeCreateConsumerGroup(
+        ConsumerGroup group = 
context.groupMetadataManager.getOrMaybeCreatePersistedConsumerGroup(
             "foo",
             true
         );

Reply via email to