This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch 4.2
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/4.2 by this push:
     new 5240f8a3123 KAFKA-19862: Group coordinator loading may fail when there 
is concurrent compaction (#20907)
5240f8a3123 is described below

commit 5240f8a31239cad48c0470de812ba1859eec54c6
Author: Izzy Harker <[email protected]>
AuthorDate: Wed Dec 10 01:51:31 2025 -0600

    KAFKA-19862: Group coordinator loading may fail when there is concurrent 
compaction (#20907)
    
    When the group coordinator loads with concurrent compaction, the record
    unassigning a partition/task can be missed which leads to an
    IllegalStateException being thrown, however the records self-resolve by
    the time loading is finished. This PR makes the change to log a warning
    and proceed with loading instead of throwing an IllegalStateException.
    
    Testing: Added unit tests for Consumer and Streams groups to ensure this
    change does not impact loading success.
    
    Reviewers: Sean Quah <[email protected]>, Lucas Brutschy
     <[email protected]>, David Jacot <[email protected]>
---
 .../coordinator/group/GroupMetadataManager.java    |   9 +-
 .../group/modern/consumer/ConsumerGroup.java       |  40 ++--
 .../coordinator/group/streams/StreamsGroup.java    |  41 ++--
 .../group/GroupCoordinatorShardTest.java           |   4 +-
 .../group/GroupMetadataManagerTest.java            | 232 +++++++++++++++++++++
 .../group/classic/ClassicGroupTest.java            |   2 +
 .../group/modern/consumer/ConsumerGroupTest.java   |  64 ++++--
 .../group/streams/StreamsGroupTest.java            |  42 ++--
 8 files changed, 358 insertions(+), 76 deletions(-)

diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
index 129d6794d69..6bf963c968e 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
@@ -815,10 +815,10 @@ public class GroupMetadataManager {
         }
 
         if (group == null) {
-            return new ConsumerGroup(snapshotRegistry, groupId);
+            return new ConsumerGroup(logContext, snapshotRegistry, groupId);
         } else if (createIfNotExists && maybeDeleteEmptyClassicGroup(group, 
records)) {
             log.info("[GroupId {}] Converted the empty classic group to a 
consumer group.", groupId);
-            return new ConsumerGroup(snapshotRegistry, groupId);
+            return new ConsumerGroup(logContext, snapshotRegistry, groupId);
         } else {
             if (group.type() == CONSUMER) {
                 return (ConsumerGroup) group;
@@ -982,7 +982,7 @@ public class GroupMetadataManager {
         }
 
         if (group == null) {
-            ConsumerGroup consumerGroup = new ConsumerGroup(snapshotRegistry, 
groupId);
+            ConsumerGroup consumerGroup = new ConsumerGroup(logContext, 
snapshotRegistry, groupId);
             groups.put(groupId, consumerGroup);
             return consumerGroup;
         } else if (group.type() == CONSUMER) {
@@ -992,7 +992,7 @@ public class GroupMetadataManager {
             // offsets if no group existed. Simple classic groups are not 
backed by any records
             // in the __consumer_offsets topic hence we can safely replace it 
here. Without this,
             // replaying consumer group records after offset commit records 
would not work.
-            ConsumerGroup consumerGroup = new ConsumerGroup(snapshotRegistry, 
groupId);
+            ConsumerGroup consumerGroup = new ConsumerGroup(logContext, 
snapshotRegistry, groupId);
             groups.put(groupId, consumerGroup);
             return consumerGroup;
         } else {
@@ -1371,6 +1371,7 @@ public class GroupMetadataManager {
         ConsumerGroup consumerGroup;
         try {
             consumerGroup = ConsumerGroup.fromClassicGroup(
+                logContext,
                 snapshotRegistry,
                 classicGroup,
                 topicHashCache,
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java
index a32efe9e027..8aa858ced02 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java
@@ -29,6 +29,7 @@ import 
org.apache.kafka.common.message.ConsumerProtocolSubscription;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.types.SchemaException;
 import org.apache.kafka.common.requests.JoinGroupRequest;
+import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.coordinator.common.runtime.CoordinatorMetadataImage;
 import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord;
 import org.apache.kafka.coordinator.group.CommitPartitionValidator;
@@ -49,6 +50,8 @@ import org.apache.kafka.timeline.TimelineHashMap;
 import org.apache.kafka.timeline.TimelineInteger;
 import org.apache.kafka.timeline.TimelineObject;
 
+import org.slf4j.Logger;
+
 import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.Collections;
@@ -104,6 +107,11 @@ public class ConsumerGroup extends 
ModernGroup<ConsumerGroupMember> {
         }
     }
 
+    /**
+     * The logger.
+     */
+    private final Logger log;
+
     /**
      * The group state.
      */
@@ -149,10 +157,12 @@ public class ConsumerGroup extends 
ModernGroup<ConsumerGroupMember> {
     private final TimelineObject<Boolean> hasSubscriptionMetadataRecord;
 
     public ConsumerGroup(
+        LogContext logContext,
         SnapshotRegistry snapshotRegistry,
         String groupId
     ) {
         super(snapshotRegistry, groupId);
+        this.log = logContext.logger(ConsumerGroup.class);
         this.state = new TimelineObject<>(snapshotRegistry, EMPTY);
         this.staticMembers = new TimelineHashMap<>(snapshotRegistry, 0);
         this.serverAssignors = new TimelineHashMap<>(snapshotRegistry, 0);
@@ -1037,7 +1047,6 @@ public class ConsumerGroup extends 
ModernGroup<ConsumerGroupMember> {
      *
      * @param assignment    The assignment.
      * @param expectedEpoch The expected epoch.
-     * @throws IllegalStateException if the epoch does not match the expected 
one.
      * package-private for testing.
      */
     void removePartitionEpochs(
@@ -1048,11 +1057,12 @@ public class ConsumerGroup extends 
ModernGroup<ConsumerGroupMember> {
             currentPartitionEpoch.compute(topicId, (__, partitionsOrNull) -> {
                 if (partitionsOrNull != null) {
                     assignedPartitions.forEach(partitionId -> {
-                        Integer prevValue = 
partitionsOrNull.remove(partitionId);
-                        if (prevValue != expectedEpoch) {
-                            throw new IllegalStateException(
-                                String.format("Cannot remove the epoch %d from 
%s-%s because the partition is " +
-                                    "still owned at a different epoch %d", 
expectedEpoch, topicId, partitionId, prevValue));
+                        Integer prevValue = partitionsOrNull.get(partitionId);
+                        if (prevValue != null && prevValue == expectedEpoch) {
+                            partitionsOrNull.remove(partitionId);
+                        } else {
+                            log.debug("[GroupId {}] Cannot remove the epoch {} 
from {}-{} because the partition is " +
+                                    "still owned at a different epoch {}", 
groupId, expectedEpoch, topicId, partitionId, prevValue);
                         }
                     });
                     if (partitionsOrNull.isEmpty()) {
@@ -1061,9 +1071,9 @@ public class ConsumerGroup extends 
ModernGroup<ConsumerGroupMember> {
                         return partitionsOrNull;
                     }
                 } else {
-                    throw new IllegalStateException(
-                        String.format("Cannot remove the epoch %d from %s 
because it does not have any epoch",
-                            expectedEpoch, topicId));
+                    log.debug("[GroupId {}] Cannot remove the epoch {} from {} 
because it does not have any epoch",
+                            groupId, expectedEpoch, topicId);
+                    return partitionsOrNull;
                 }
             });
         });
@@ -1074,7 +1084,7 @@ public class ConsumerGroup extends 
ModernGroup<ConsumerGroupMember> {
      *
      * @param assignment    The assignment.
      * @param epoch         The new epoch.
-     * @throws IllegalStateException if the partition already has an epoch 
assigned.
+     * @throws IllegalStateException if updating a partition with a smaller or 
equal epoch.
      * package-private for testing.
      */
     void addPartitionEpochs(
@@ -1087,8 +1097,10 @@ public class ConsumerGroup extends 
ModernGroup<ConsumerGroupMember> {
                     partitionsOrNull = new TimelineHashMap<>(snapshotRegistry, 
assignedPartitions.size());
                 }
                 for (Integer partitionId : assignedPartitions) {
-                    Integer prevValue = partitionsOrNull.put(partitionId, 
epoch);
-                    if (prevValue != null) {
+                    Integer prevValue = partitionsOrNull.get(partitionId);
+                    if (prevValue == null || prevValue < epoch) {
+                        partitionsOrNull.put(partitionId, epoch);
+                    } else {
                         throw new IllegalStateException(
                             String.format("Cannot set the epoch of %s-%s to %d 
because the partition is " +
                                 "still owned at epoch %d", topicId, 
partitionId, epoch, prevValue));
@@ -1124,6 +1136,7 @@ public class ConsumerGroup extends 
ModernGroup<ConsumerGroupMember> {
     /**
      * Create a new consumer group according to the given classic group.
      *
+     * @param logContext        The log context.
      * @param snapshotRegistry  The SnapshotRegistry.
      * @param classicGroup      The converted classic group.
      * @param topicHashCache    The cache for topic hashes.
@@ -1134,13 +1147,14 @@ public class ConsumerGroup extends 
ModernGroup<ConsumerGroupMember> {
      * @throws UnsupportedVersionException if userData from a custom assignor 
would be lost.
      */
     public static ConsumerGroup fromClassicGroup(
+        LogContext logContext,
         SnapshotRegistry snapshotRegistry,
         ClassicGroup classicGroup,
         Map<String, Long> topicHashCache,
         CoordinatorMetadataImage metadataImage
     ) {
         String groupId = classicGroup.groupId();
-        ConsumerGroup consumerGroup = new ConsumerGroup(snapshotRegistry, 
groupId);
+        ConsumerGroup consumerGroup = new ConsumerGroup(logContext, 
snapshotRegistry, groupId);
         consumerGroup.setGroupEpoch(classicGroup.generationId());
         consumerGroup.setTargetAssignmentEpoch(classicGroup.generationId());
 
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java
index 82dd871b3cd..621a8f9d1bd 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java
@@ -947,7 +947,7 @@ public class StreamsGroup implements Group {
      *
      * @param assignment    The assignment.
      * @param expectedProcessId The expected process ID.
-     * @throws IllegalStateException if the process ID does not match the 
expected one. package-private for testing.
+     * package-private for testing.
      */
     private void removeTaskProcessIds(
         Map<String, Map<Integer, Integer>> assignment,
@@ -958,11 +958,12 @@ public class StreamsGroup implements Group {
             currentTasksProcessId.compute(subtopologyId, (__, 
partitionsOrNull) -> {
                 if (partitionsOrNull != null) {
                     assignedPartitions.keySet().forEach(partitionId -> {
-                        String prevValue = 
partitionsOrNull.remove(partitionId);
-                        if (!Objects.equals(prevValue, expectedProcessId)) {
-                            throw new IllegalStateException(
-                                String.format("Cannot remove the process ID %s 
from task %s_%s because the partition is " +
-                                    "still owned at a different process ID 
%s", expectedProcessId, subtopologyId, partitionId, prevValue));
+                        String prevValue = partitionsOrNull.get(partitionId);
+                        if (Objects.equals(prevValue, expectedProcessId)) {
+                            partitionsOrNull.remove(partitionId);
+                        } else {
+                            log.debug("[GroupId {}] Cannot remove the process 
ID {} from task {}_{} because the partition is " +
+                                    "still owned at a different process ID 
{}", groupId, expectedProcessId, subtopologyId, partitionId, prevValue);
                         }
                     });
                     if (partitionsOrNull.isEmpty()) {
@@ -971,9 +972,9 @@ public class StreamsGroup implements Group {
                         return partitionsOrNull;
                     }
                 } else {
-                    throw new IllegalStateException(
-                        String.format("Cannot remove the process ID %s from %s 
because it does not have any processId",
-                            expectedProcessId, subtopologyId));
+                    log.debug("[GroupId {}] Cannot remove the process ID {} 
from {} because it does not have any processId",
+                            groupId, expectedProcessId, subtopologyId);
+                    return partitionsOrNull;
                 }
             });
         });
@@ -984,7 +985,7 @@ public class StreamsGroup implements Group {
      *
      * @param assignment    The assignment.
      * @param processIdToRemove The expected process ID.
-     * @throws IllegalStateException if the process ID does not match the 
expected one. package-private for testing.
+     * package-private for testing.
      */
     private void removeTaskProcessIdsFromSet(
         Map<String, Set<Integer>> assignment,
@@ -995,10 +996,9 @@ public class StreamsGroup implements Group {
             currentTasksProcessId.compute(subtopologyId, (__, 
partitionsOrNull) -> {
                 if (partitionsOrNull != null) {
                     assignedPartitions.forEach(partitionId -> {
-                        if 
(!partitionsOrNull.get(partitionId).remove(processIdToRemove)) {
-                            throw new IllegalStateException(
-                                String.format("Cannot remove the process ID %s 
from task %s_%s because the task is " +
-                                    "not owned by this process ID", 
processIdToRemove, subtopologyId, partitionId));
+                        if (!partitionsOrNull.containsKey(partitionId) || 
!partitionsOrNull.get(partitionId).remove(processIdToRemove)) {
+                            log.debug("[GroupId {}] Cannot remove the process 
ID {} from task {}_{} because the task is " +
+                                    "not owned by this process ID", groupId, 
processIdToRemove, subtopologyId, partitionId);
                         }
                     });
                     if (partitionsOrNull.isEmpty()) {
@@ -1007,9 +1007,9 @@ public class StreamsGroup implements Group {
                         return partitionsOrNull;
                     }
                 } else {
-                    throw new IllegalStateException(
-                        String.format("Cannot remove the process ID %s from %s 
because it does not have any process ID",
-                            processIdToRemove, subtopologyId));
+                    log.debug("[GroupId {}] Cannot remove the process ID {} 
from {} because it does not have any process ID",
+                            groupId, processIdToRemove, subtopologyId);
+                    return partitionsOrNull;
                 }
             });
         });
@@ -1020,7 +1020,7 @@ public class StreamsGroup implements Group {
      *
      * @param tasks     The assigned tasks.
      * @param processId The process ID.
-     * @throws IllegalStateException if the partition already has an epoch 
assigned. package-private for testing.
+     * package-private for testing.
      */
     void addTaskProcessId(
         TasksTupleWithEpochs tasks,
@@ -1046,9 +1046,8 @@ public class StreamsGroup implements Group {
                 for (Integer partitionId : 
assignedTaskPartitionsWithEpochs.keySet()) {
                     String prevValue = partitionsOrNull.put(partitionId, 
processId);
                     if (prevValue != null) {
-                        throw new IllegalStateException(
-                            String.format("Cannot set the process ID of %s-%s 
to %s because the partition is " +
-                                "still owned by process ID %s", subtopologyId, 
partitionId, processId, prevValue));
+                        log.debug("[GroupId {}] Setting the process ID of 
{}-{} to {} even though the partition is " +
+                            "still owned by process ID {}", groupId, 
subtopologyId, partitionId, processId, prevValue);
                     }
                 }
                 return partitionsOrNull;
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java
index 4dc0c8ff78c..f533cffc742 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java
@@ -1384,8 +1384,8 @@ public class GroupCoordinatorShardTest {
 
         SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new 
LogContext());
 
-        ConsumerGroup group1 = new ConsumerGroup(snapshotRegistry, "group-id");
-        ConsumerGroup group2 = new ConsumerGroup(snapshotRegistry, 
"other-group-id");
+        ConsumerGroup group1 = new ConsumerGroup(new LogContext(), 
snapshotRegistry, "group-id");
+        ConsumerGroup group2 = new ConsumerGroup(new LogContext(), 
snapshotRegistry, "other-group-id");
 
         when(groupMetadataManager.groupIds()).thenReturn(Set.of("group-id", 
"other-group-id"));
         when(groupMetadataManager.group("group-id")).thenReturn(group1);
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
index 3f1ca4770b0..b8beedca616 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
@@ -24468,6 +24468,238 @@ public class GroupMetadataManagerTest {
         assertDoesNotThrow(() -> context.replay(record));
     }
 
+    @Test
+    public void testReplayConsumerGroupCurrentMemberAssignmentWithCompaction() 
{
+        String groupId = "fooup";
+        String memberIdA = "memberIdA";
+        String memberIdB = "memberIdB";
+        Uuid topicId = Uuid.randomUuid();
+
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder().build();
+
+        // This test enacts the following scenario:
+        // 1. Member A is assigned partition 0.
+        // 2. Member A is unassigned partition 0 [record removed by 
compaction].
+        // 3. Member B is assigned partition 0. 
+        // 4. Member A is assigned partition 1. 
+        // If record 2 is processed, there are no issues, however with 
compaction it is possible that 
+        // unassignment records are removed. We would like to not fail in 
these cases.
+        // Therefore we will allow assignments to owned partitions as long as 
the epoch is larger. 
+
+        
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 new ConsumerGroupMember.Builder(memberIdA)
+            .setState(MemberState.STABLE)
+            .setMemberEpoch(11)
+            .setPreviousMemberEpoch(10)
+            .setAssignedPartitions(mkAssignment(mkTopicAssignment(topicId, 0)))
+            .build()));
+
+        // Partition 0's owner is replaced by member B at epoch 12.
+        
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 new ConsumerGroupMember.Builder(memberIdB)
+            .setState(MemberState.STABLE)
+            .setMemberEpoch(12)
+            .setPreviousMemberEpoch(11)
+            .setAssignedPartitions(mkAssignment(mkTopicAssignment(topicId, 0)))
+            .build()));
+
+        // Partition 0 must remain with member B at epoch 12 even though 
member A has just been unassigned partition 0.
+        
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 new ConsumerGroupMember.Builder(memberIdA)
+            .setState(MemberState.STABLE)
+            .setMemberEpoch(13)
+            .setPreviousMemberEpoch(12)
+            .setAssignedPartitions(mkAssignment(mkTopicAssignment(topicId, 1)))
+            .build()));
+
+        // Verify partition epochs.
+        ConsumerGroup group = 
context.groupMetadataManager.consumerGroup(groupId);
+        assertEquals(12, group.currentPartitionEpoch(topicId, 0));
+        assertEquals(13, group.currentPartitionEpoch(topicId, 1));
+    }
+
+    @Test
+    public void 
testReplayConsumerGroupCurrentMemberAssignmentUnownedTopicWithCompaction() {
+        String groupId = "fooup";
+        String memberIdA = "memberIdA";
+        String memberIdB = "memberIdB";
+        Uuid fooTopicId = Uuid.randomUuid();
+        Uuid barTopicId = Uuid.randomUuid();
+
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder().build();
+
+        // This test enacts the following scenario:
+        // 1. Member A is assigned partition foo-0.
+        // 2. Member A is unassigned partition foo-0 [record removed by 
compaction].
+        // 3. Member B is assigned partition foo-0.
+        // 4. Member B is unassigned partition foo-0. 
+        // 5. Member A is assigned partition bar-0. 
+        // This is a legitimate set of assignments but with compaction the 
unassignment record can be skipped.
+        // This can lead to conflicts from updating an owned partition in step 
3 and attempting 
+        // to remove nonexistent ownership in step 5. We want to ensure 
removing ownership from a 
+        // completely unowned partition in step 5 is allowed.  
+
+        
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 new ConsumerGroupMember.Builder(memberIdA)
+            .setState(MemberState.STABLE)
+            .setMemberEpoch(11)
+            .setPreviousMemberEpoch(10)
+            .setAssignedPartitions(mkAssignment(mkTopicAssignment(fooTopicId, 
0)))
+            .build()));
+
+        // foo-0's owner is replaced by member B at epoch 12.
+        
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 new ConsumerGroupMember.Builder(memberIdB)
+            .setState(MemberState.STABLE)
+            .setMemberEpoch(12)
+            .setPreviousMemberEpoch(11)
+            .setAssignedPartitions(mkAssignment(mkTopicAssignment(fooTopicId, 
0)))
+            .build()));
+
+        // foo becomes unowned.
+        
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 new ConsumerGroupMember.Builder(memberIdB)
+            .setState(MemberState.STABLE)
+            .setMemberEpoch(13)
+            .setPreviousMemberEpoch(12)
+            .build()));
+
+        // Member A is unassigned foo-0.
+        
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 new ConsumerGroupMember.Builder(memberIdA)
+            .setState(MemberState.STABLE)
+            .setMemberEpoch(14)
+            .setPreviousMemberEpoch(13)
+            .setAssignedPartitions(mkAssignment(mkTopicAssignment(barTopicId, 
0)))
+            .build()));
+
+        // Verify foo-0 is unowned and bar-0 is owned by member A at epoch 14.
+        ConsumerGroup group = 
context.groupMetadataManager.consumerGroup(groupId);
+        assertEquals(-1, group.currentPartitionEpoch(fooTopicId, 0));
+        assertEquals(14, group.currentPartitionEpoch(barTopicId, 0));
+    }
+
+    @Test
+    public void testReplayStreamsGroupCurrentMemberAssignmentWithCompaction() {
+        String groupId = "fooup";
+        String memberIdA = "memberIdA";
+        String memberIdB = "memberIdB";
+        String processIdA = "processIdA";
+        String processIdB = "processIdB";
+        String subtopologyId = "subtopology";
+
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder().build();
+        // Initialize members with process Ids.
+        
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord(groupId,
 
+            streamsGroupMemberBuilderWithDefaults(memberIdA)
+                .setProcessId(processIdA)
+                .build()));
+        
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord(groupId,
 
+            streamsGroupMemberBuilderWithDefaults(memberIdB)
+                .setProcessId(processIdB)
+                .build()));
+
+        // This test enacts the following scenario:
+        // 1. Member A is assigned task 0.
+        // 2. Member A is unassigned task 0 [record removed by compaction].
+        // 3. Member B is assigned task 0. 
+        // 4. Member A is assigned task 1. 
+        // If record 2 is processed, there are no issues, however with 
compaction it is possible that 
+        // unassignment records are removed. We would like to not fail in 
these cases.
+        // Therefore we will allow assignments to owned tasks as long as the 
epoch is larger.
+
+        // Assign task 0 to member A.
+        
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(groupId,
 streamsGroupMemberBuilderWithDefaults(memberIdA)
+            
.setState(org.apache.kafka.coordinator.group.streams.MemberState.STABLE)
+            .setMemberEpoch(11)
+            .setPreviousMemberEpoch(10)
+            
.setAssignedTasks(TaskAssignmentTestUtil.mkTasksTupleWithEpochs(TaskRole.ACTIVE,
 
+                    TaskAssignmentTestUtil.mkTasksWithEpochs(subtopologyId, 
Map.of(0, 11))))
+            .build()));
+
+        // Task 0's owner is replaced by member B at epoch 12.
+        
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(groupId,
 streamsGroupMemberBuilderWithDefaults(memberIdB)
+            .setMemberEpoch(12)
+            .setPreviousMemberEpoch(11)
+            
.setAssignedTasks(TaskAssignmentTestUtil.mkTasksTupleWithEpochs(TaskRole.ACTIVE,
 
+                    TaskAssignmentTestUtil.mkTasksWithEpochs(subtopologyId, 
Map.of(0, 12))))
+            .build()));
+
+        // Task 0 must remain with member B at epoch 12 even though member A 
has just been unassigned task 0.
+        
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(groupId,
 streamsGroupMemberBuilderWithDefaults(memberIdA)
+            .setMemberEpoch(13)
+            .setPreviousMemberEpoch(12)
+            
.setAssignedTasks(TaskAssignmentTestUtil.mkTasksTupleWithEpochs(TaskRole.ACTIVE,
 
+                    TaskAssignmentTestUtil.mkTasksWithEpochs(subtopologyId, 
Map.of(1, 13))))
+            .build()));
+
+        // Verify task 1 is assigned to member A and task 0 to member B.
+        StreamsGroup group = 
context.groupMetadataManager.streamsGroup(groupId);
+        assertEquals(processIdA, 
group.currentActiveTaskProcessId(subtopologyId, 1));
+        assertEquals(processIdB, 
group.currentActiveTaskProcessId(subtopologyId, 0));
+    }
+
+    @Test
+    public void 
testReplayStreamsGroupCurrentMemberAssignmentUnownedTopologyWithCompaction() {
+        String groupId = "fooup";
+        String memberIdA = "memberIdA";
+        String memberIdB = "memberIdB";
+        String processIdA = "processIdA";
+        String processIdB = "processIdB";
+        String subtopologyFoo = "subtopologyFoo";
+        String subtopologyBar = "subtopologyBar";
+
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder().build();
+        // Initialize members with process Ids.
+        
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord(groupId,
 
+            streamsGroupMemberBuilderWithDefaults(memberIdA)
+                .setProcessId(processIdA)
+                .build()));
+        
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord(groupId,
 
+            streamsGroupMemberBuilderWithDefaults(memberIdB)
+                .setProcessId(processIdB)
+                .build()));
+
+        // This test enacts the following scenario:
+        // 1. Member A is assigned task foo-0.
+        // 2. Member A is unassigned task foo-0 [record removed by compaction].
+        // 3. Member B is assigned task foo-0.
+        // 4. Member B is unassigned task foo-0. 
+        // 5. Member A is assigned task bar-0. 
+        // This is a legitimate set of assignments but with compaction the 
unassignment record can be skipped.
+        // This can lead to conflicts from updating an owned subtopology in 
step 3 and attempting to remove
+        // nonexistent ownership in step 5. We want to ensure removing 
ownership from a 
+        // completely unowned subtopology in step 5 is allowed.  
+
+        
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(groupId,
 streamsGroupMemberBuilderWithDefaults(memberIdA)
+            
.setState(org.apache.kafka.coordinator.group.streams.MemberState.STABLE)
+            .setMemberEpoch(11)
+            .setPreviousMemberEpoch(10)
+            
.setAssignedTasks(TaskAssignmentTestUtil.mkTasksTupleWithEpochs(TaskRole.ACTIVE,
 
+                TaskAssignmentTestUtil.mkTasksWithEpochs(subtopologyFoo, 
Map.of(0, 11))))
+            .build()));
+
+        // foo-0's owner is replaced by member B at epoch 12.
+        
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(groupId,
 streamsGroupMemberBuilderWithDefaults(memberIdB)
+            .setMemberEpoch(12)
+            .setPreviousMemberEpoch(11)
+            
.setAssignedTasks(TaskAssignmentTestUtil.mkTasksTupleWithEpochs(TaskRole.ACTIVE,
 
+                TaskAssignmentTestUtil.mkTasksWithEpochs(subtopologyFoo, 
Map.of(0, 12))))
+            .build()));
+
+        // foo becomes unowned
+        
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(groupId,
 streamsGroupMemberBuilderWithDefaults(memberIdB)
+            .setMemberEpoch(13)
+            .setPreviousMemberEpoch(12)
+            .build()));
+
+        // Member A is unassigned foo-0.
+        
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(groupId,
 streamsGroupMemberBuilderWithDefaults(memberIdA)
+            .setMemberEpoch(14)
+            .setPreviousMemberEpoch(13)
+            
.setAssignedTasks(TaskAssignmentTestUtil.mkTasksTupleWithEpochs(TaskRole.ACTIVE,
 
+                TaskAssignmentTestUtil.mkTasksWithEpochs(subtopologyBar, 
Map.of(0, 14))))
+            .build()));
+
+        // Verify foo-0 is unassigned and bar-0 is assigned to member A.
+        StreamsGroup group = 
context.groupMetadataManager.streamsGroup(groupId);
+        assertEquals(null, group.currentActiveTaskProcessId(subtopologyFoo, 
0));
+        assertEquals(processIdA, 
group.currentActiveTaskProcessId(subtopologyBar, 0));
+    }
+
     private record PendingAssignmentCase(
         String description,
         String groupId,
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/classic/ClassicGroupTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/classic/ClassicGroupTest.java
index 24212d10982..445ba969648 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/classic/ClassicGroupTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/classic/ClassicGroupTest.java
@@ -1380,6 +1380,7 @@ public class ClassicGroupTest {
             .build();
 
         ConsumerGroup consumerGroup = new ConsumerGroup(
+            logContext,
             new SnapshotRegistry(logContext),
             groupId
         );
@@ -1532,6 +1533,7 @@ public class ClassicGroupTest {
             .build();
 
         ConsumerGroup consumerGroup = new ConsumerGroup(
+            logContext,
             new SnapshotRegistry(logContext),
             groupId
         );
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java
index 5956e51b161..c8ada8fc5f4 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java
@@ -87,6 +87,7 @@ public class ConsumerGroupTest {
     private ConsumerGroup createConsumerGroup(String groupId) {
         SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new 
LogContext());
         return new ConsumerGroup(
+            new LogContext(),
             snapshotRegistry,
             groupId
         );
@@ -281,14 +282,26 @@ public class ConsumerGroupTest {
         consumerGroup.updateMember(m1);
 
         ConsumerGroupMember m2 = new ConsumerGroupMember.Builder("m2")
+            .setMemberEpoch(11)
+            .setAssignedPartitions(mkAssignment(
+                mkTopicAssignment(fooTopicId, 1)))
+            .build();
+
+        // m2 can acquire foo-1 because the epoch is larger than m1's epoch.
+        // This should not throw IllegalStateException.
+        consumerGroup.updateMember(m2);
+
+        ConsumerGroupMember m3 = new ConsumerGroupMember.Builder("m3")
             .setMemberEpoch(10)
             .setAssignedPartitions(mkAssignment(
                 mkTopicAssignment(fooTopicId, 1)))
             .build();
 
-        // m2 should not be able to acquire foo-1 because the partition is
-        // still owned by another member.
-        assertThrows(IllegalStateException.class, () -> 
consumerGroup.updateMember(m2));
+        // m3 should not be able to acquire foo-1 because the epoch is smaller 
+        // than the current partition epoch (11).
+        assertThrows(IllegalStateException.class, () -> {
+            consumerGroup.updateMember(m3);
+        });
     }
 
     @Test
@@ -296,13 +309,13 @@ public class ConsumerGroupTest {
         Uuid fooTopicId = Uuid.randomUuid();
         ConsumerGroup consumerGroup = createConsumerGroup("foo");
 
-        // Removing should fail because there is no epoch set.
-        assertThrows(IllegalStateException.class, () -> 
consumerGroup.removePartitionEpochs(
+        // Removing should be a no-op when there is no epoch set.
+        consumerGroup.removePartitionEpochs(
             mkAssignment(
                 mkTopicAssignment(fooTopicId, 1)
             ),
             10
-        ));
+        );
 
         ConsumerGroupMember m1 = new ConsumerGroupMember.Builder("m1")
             .setMemberEpoch(10)
@@ -312,13 +325,15 @@ public class ConsumerGroupTest {
 
         consumerGroup.updateMember(m1);
 
-        // Removing should fail because the expected epoch is incorrect.
-        assertThrows(IllegalStateException.class, () -> 
consumerGroup.removePartitionEpochs(
+        // Removing with incorrect epoch should do nothing. 
+        // A debug message is logged, no exception is thrown.
+        consumerGroup.removePartitionEpochs(
             mkAssignment(
                 mkTopicAssignment(fooTopicId, 1)
             ),
             11
-        ));
+        );
+        assertEquals(10, consumerGroup.currentPartitionEpoch(fooTopicId, 1));
     }
 
     @Test
@@ -333,14 +348,24 @@ public class ConsumerGroupTest {
             10
         );
 
-        // Changing the epoch should fail because the owner of the partition
-        // should remove it first.
-        assertThrows(IllegalStateException.class, () -> 
consumerGroup.addPartitionEpochs(
+        // Updating to a larger epoch should succeed.
+        consumerGroup.addPartitionEpochs(
             mkAssignment(
                 mkTopicAssignment(fooTopicId, 1)
             ),
             11
-        ));
+        );
+        assertEquals(11, consumerGroup.currentPartitionEpoch(fooTopicId, 1));
+
+        // Updating to a smaller epoch should fail.
+        assertThrows(IllegalStateException.class, () -> {
+            consumerGroup.addPartitionEpochs(
+                mkAssignment(
+                    mkTopicAssignment(fooTopicId, 1)
+                ),
+                10
+            );
+        });
     }
 
     @Test
@@ -696,7 +721,7 @@ public class ConsumerGroupTest {
     @Test
     public void testUpdateInvertedAssignment() {
         SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new 
LogContext());
-        ConsumerGroup consumerGroup = new ConsumerGroup(snapshotRegistry, 
"test-group");
+        ConsumerGroup consumerGroup = new ConsumerGroup(new LogContext(), 
snapshotRegistry, "test-group");
         Uuid topicId = Uuid.randomUuid();
         String memberId1 = "member1";
         String memberId2 = "member2";
@@ -911,7 +936,7 @@ public class ConsumerGroupTest {
     @Test
     public void testAsListedGroup() {
         SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new 
LogContext());
-        ConsumerGroup group = new ConsumerGroup(snapshotRegistry, "group-foo");
+        ConsumerGroup group = new ConsumerGroup(new LogContext(), 
snapshotRegistry, "group-foo");
         snapshotRegistry.idempotentCreateSnapshot(0);
         assertEquals(ConsumerGroup.ConsumerGroupState.EMPTY.toString(), 
group.stateAsString(0));
         group.updateMember(new ConsumerGroupMember.Builder("member1")
@@ -926,6 +951,7 @@ public class ConsumerGroupTest {
     public void testValidateOffsetFetch() {
         SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new 
LogContext());
         ConsumerGroup group = new ConsumerGroup(
+            new LogContext(), 
             snapshotRegistry,
             "group-foo"
         );
@@ -986,7 +1012,7 @@ public class ConsumerGroupTest {
         long commitTimestamp = 20000L;
         long offsetsRetentionMs = 10000L;
         OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(15000L, 
OptionalInt.empty(), "", commitTimestamp, OptionalLong.empty(), Uuid.ZERO_UUID);
-        ConsumerGroup group = new ConsumerGroup(new SnapshotRegistry(new 
LogContext()), "group-id");
+        ConsumerGroup group = new ConsumerGroup(new LogContext(), new 
SnapshotRegistry(new LogContext()), "group-id");
 
         Optional<OffsetExpirationCondition> offsetExpirationCondition = 
group.offsetExpirationCondition();
         assertTrue(offsetExpirationCondition.isPresent());
@@ -1023,7 +1049,7 @@ public class ConsumerGroupTest {
     @Test
     public void testAsDescribedGroup() {
         SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new 
LogContext());
-        ConsumerGroup group = new ConsumerGroup(snapshotRegistry, 
"group-id-1");
+        ConsumerGroup group = new ConsumerGroup(new LogContext(), 
snapshotRegistry, "group-id-1");
         snapshotRegistry.idempotentCreateSnapshot(0);
         assertEquals(ConsumerGroup.ConsumerGroupState.EMPTY.toString(), 
group.stateAsString(0));
 
@@ -1060,7 +1086,7 @@ public class ConsumerGroupTest {
     @Test
     public void testIsInStatesCaseInsensitive() {
         SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new 
LogContext());
-        ConsumerGroup group = new ConsumerGroup(snapshotRegistry, "group-foo");
+        ConsumerGroup group = new ConsumerGroup(new LogContext(), 
snapshotRegistry, "group-foo");
         snapshotRegistry.idempotentCreateSnapshot(0);
         assertTrue(group.isInStates(Set.of("empty"), 0));
         assertFalse(group.isInStates(Set.of("Empty"), 0));
@@ -1290,6 +1316,7 @@ public class ConsumerGroupTest {
         classicGroup.add(member);
 
         ConsumerGroup consumerGroup = ConsumerGroup.fromClassicGroup(
+            logContext,
             new SnapshotRegistry(logContext),
             classicGroup,
             new HashMap<>(),
@@ -1297,6 +1324,7 @@ public class ConsumerGroupTest {
         );
 
         ConsumerGroup expectedConsumerGroup = new ConsumerGroup(
+            new LogContext(), 
             new SnapshotRegistry(logContext),
             groupId
         );
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java
index ced7d7ee4e7..f015441f86f 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java
@@ -342,7 +342,7 @@ public class StreamsGroupTest {
         StreamsGroup streamsGroup = createStreamsGroup("foo");
 
         StreamsGroupMember m1 = new StreamsGroupMember.Builder("m1")
-            .setProcessId("process")
+            .setProcessId("process1")
             .setAssignedTasks(
                 new TasksTupleWithEpochs(
                     mkTasksPerSubtopologyWithCommonEpoch(10, 
mkTasks(fooSubtopologyId, 1)),
@@ -355,19 +355,20 @@ public class StreamsGroupTest {
         streamsGroup.updateMember(m1);
 
         StreamsGroupMember m2 = new StreamsGroupMember.Builder("m2")
-            .setProcessId("process")
+            .setProcessId("process2")
             .setAssignedTasks(
                 new TasksTupleWithEpochs(
-                    mkTasksPerSubtopologyWithCommonEpoch(10, 
mkTasks(fooSubtopologyId, 1)),
+                    mkTasksPerSubtopologyWithCommonEpoch(9, 
mkTasks(fooSubtopologyId, 1)),
                     Map.of(),
                     Map.of()
                 )
             )
             .build();
 
-        // m2 should not be able to acquire foo-1 because the partition is
-        // still owned by another member.
-        assertThrows(IllegalStateException.class, () -> 
streamsGroup.updateMember(m2));
+        // We allow m2 to acquire foo-1 despite the fact that m1 has ownership 
because the processId is different.
+        streamsGroup.updateMember(m2);
+
+        assertEquals("process2", 
streamsGroup.currentActiveTaskProcessId(fooSubtopologyId, 1));
     }
 
 
@@ -377,11 +378,11 @@ public class StreamsGroupTest {
         String fooSubtopologyId = "foo-sub";
         StreamsGroup streamsGroup = createStreamsGroup("foo");
 
-        // Removing should fail because there is no epoch set.
-        assertThrows(IllegalStateException.class, () -> 
streamsGroup.removeTaskProcessIds(
+        // Removing should be a no-op when there is no process id set.
+        streamsGroup.removeTaskProcessIds(
             TaskAssignmentTestUtil.mkTasksTupleWithCommonEpoch(taskRole, 10, 
mkTasks(fooSubtopologyId, 1)),
             "process"
-        ));
+        );
 
         StreamsGroupMember m1 = new StreamsGroupMember.Builder("m1")
             .setProcessId("process")
@@ -390,11 +391,15 @@ public class StreamsGroupTest {
 
         streamsGroup.updateMember(m1);
 
-        // Removing should fail because the expected epoch is incorrect.
-        assertThrows(IllegalStateException.class, () -> 
streamsGroup.removeTaskProcessIds(
-            TaskAssignmentTestUtil.mkTasksTupleWithCommonEpoch(taskRole, 10, 
mkTasks(fooSubtopologyId, 1)),
+        // Removing with incorrect process id should do nothing. 
+        // A debug message is logged, no exception is thrown.
+        streamsGroup.removeTaskProcessIds(
+            TaskAssignmentTestUtil.mkTasksTupleWithCommonEpoch(taskRole, 9, 
mkTasks(fooSubtopologyId, 1)),
             "process1"
-        ));
+        );
+        if (taskRole == TaskRole.ACTIVE) {
+            assertEquals("process", 
streamsGroup.currentActiveTaskProcessId(fooSubtopologyId, 1));
+        }
     }
 
     @Test
@@ -411,16 +416,17 @@ public class StreamsGroupTest {
             "process"
         );
 
-        // Changing the epoch should fail because the owner of the partition
-        // should remove it first.
-        assertThrows(IllegalStateException.class, () -> 
streamsGroup.addTaskProcessId(
+        // We allow replacing with a different process id.
+        streamsGroup.addTaskProcessId(
             new TasksTupleWithEpochs(
                 mkTasksPerSubtopologyWithCommonEpoch(10, 
mkTasks(fooSubtopologyId, 1)),
                 mkTasksPerSubtopology(mkTasks(fooSubtopologyId, 2)),
                 mkTasksPerSubtopology(mkTasks(fooSubtopologyId, 3))
             ),
-            "process"
-        ));
+            "process2"
+        );
+
+        assertEquals("process2", 
streamsGroup.currentActiveTaskProcessId(fooSubtopologyId, 1));
     }
 
     @Test


Reply via email to