This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch 4.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/4.0 by this push:
     new 52e1569ef22 KAFKA-19862: Group coordinator loading may fail when there 
is concurrent compaction (4.0) (#21118)
52e1569ef22 is described below

commit 52e1569ef22cdbc4420fcdc19b2b9d4f4fab7da7
Author: Izzy Harker <[email protected]>
AuthorDate: Thu Dec 11 08:18:09 2025 -0600

    KAFKA-19862: Group coordinator loading may fail when there is concurrent 
compaction (4.0) (#21118)
    
    Cherry-pick changes (https://github.com/apache/kafka/pull/20907) to 4.0
    
    Conflicts:
    -> Removed
    
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java
    -> Removed
    
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java
    ->
    
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
    - only added required ConsumerGroup tests and kept everything else the
    same
    -> Kept GroupCoordinatorMetricsShard argument in ConsumerGroup
    constructor
    
    Reviewers: Sean Quah <[email protected]>, Lucas Brutschy
    <[email protected]>, David Jacot <[email protected]>
---
 .../coordinator/group/GroupMetadataManager.java    |   9 +-
 .../group/modern/consumer/ConsumerGroup.java       |  40 +++++---
 .../group/GroupMetadataManagerTest.java            | 104 +++++++++++++++++++++
 .../group/classic/ClassicGroupTest.java            |   2 +
 .../group/modern/consumer/ConsumerGroupTest.java   |  64 +++++++++----
 5 files changed, 184 insertions(+), 35 deletions(-)

diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
index d97479f1269..7089bb11287 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
@@ -694,10 +694,10 @@ public class GroupMetadataManager {
         }
 
         if (group == null) {
-            return new ConsumerGroup(snapshotRegistry, groupId, metrics);
+            return new ConsumerGroup(logContext, snapshotRegistry, groupId, 
metrics);
         } else if (createIfNotExists && maybeDeleteEmptyClassicGroup(group, 
records)) {
             log.info("[GroupId {}] Converted the empty classic group to a 
consumer group.", groupId);
-            return new ConsumerGroup(snapshotRegistry, groupId, metrics);
+            return new ConsumerGroup(logContext, snapshotRegistry, groupId, 
metrics);
         } else {
             if (group.type() == CONSUMER) {
                 return (ConsumerGroup) group;
@@ -766,7 +766,7 @@ public class GroupMetadataManager {
         }
 
         if (group == null) {
-            ConsumerGroup consumerGroup = new ConsumerGroup(snapshotRegistry, 
groupId, metrics);
+            ConsumerGroup consumerGroup = new ConsumerGroup(logContext, 
snapshotRegistry, groupId, metrics);
             groups.put(groupId, consumerGroup);
             return consumerGroup;
         } else if (group.type() == CONSUMER) {
@@ -776,7 +776,7 @@ public class GroupMetadataManager {
             // offsets if no group existed. Simple classic groups are not 
backed by any records
             // in the __consumer_offsets topic hence we can safely replace it 
here. Without this,
             // replaying consumer group records after offset commit records 
would not work.
-            ConsumerGroup consumerGroup = new ConsumerGroup(snapshotRegistry, 
groupId, metrics);
+            ConsumerGroup consumerGroup = new ConsumerGroup(logContext, 
snapshotRegistry, groupId, metrics);
             groups.put(groupId, consumerGroup);
             return consumerGroup;
         } else {
@@ -1118,6 +1118,7 @@ public class GroupMetadataManager {
         ConsumerGroup consumerGroup;
         try {
             consumerGroup = ConsumerGroup.fromClassicGroup(
+                logContext,
                 snapshotRegistry,
                 metrics,
                 classicGroup,
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java
index c0f8673eae6..ec1e5a8d960 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java
@@ -29,6 +29,7 @@ import 
org.apache.kafka.common.message.ConsumerProtocolSubscription;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.types.SchemaException;
 import org.apache.kafka.common.requests.JoinGroupRequest;
+import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord;
 import org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers;
 import org.apache.kafka.coordinator.group.OffsetExpirationCondition;
@@ -50,6 +51,8 @@ import org.apache.kafka.timeline.TimelineHashMap;
 import org.apache.kafka.timeline.TimelineInteger;
 import org.apache.kafka.timeline.TimelineObject;
 
+import org.slf4j.Logger;
+
 import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.Collections;
@@ -105,6 +108,11 @@ public class ConsumerGroup extends 
ModernGroup<ConsumerGroupMember> {
         }
     }
 
+    /**
+     * The logger.
+     */
+    private final Logger log;
+
     /**
      * The group state.
      */
@@ -153,11 +161,13 @@ public class ConsumerGroup extends 
ModernGroup<ConsumerGroupMember> {
     private final TimelineHashMap<String, ResolvedRegularExpression> 
resolvedRegularExpressions;
 
     public ConsumerGroup(
+        LogContext logContext,
         SnapshotRegistry snapshotRegistry,
         String groupId,
         GroupCoordinatorMetricsShard metrics
     ) {
         super(snapshotRegistry, groupId);
+        this.log = logContext.logger(ConsumerGroup.class);
         this.state = new TimelineObject<>(snapshotRegistry, EMPTY);
         this.staticMembers = new TimelineHashMap<>(snapshotRegistry, 0);
         this.serverAssignors = new TimelineHashMap<>(snapshotRegistry, 0);
@@ -1040,7 +1050,6 @@ public class ConsumerGroup extends 
ModernGroup<ConsumerGroupMember> {
      *
      * @param assignment    The assignment.
      * @param expectedEpoch The expected epoch.
-     * @throws IllegalStateException if the epoch does not match the expected 
one.
      * package-private for testing.
      */
     void removePartitionEpochs(
@@ -1051,11 +1060,12 @@ public class ConsumerGroup extends 
ModernGroup<ConsumerGroupMember> {
             currentPartitionEpoch.compute(topicId, (__, partitionsOrNull) -> {
                 if (partitionsOrNull != null) {
                     assignedPartitions.forEach(partitionId -> {
-                        Integer prevValue = 
partitionsOrNull.remove(partitionId);
-                        if (prevValue != expectedEpoch) {
-                            throw new IllegalStateException(
-                                String.format("Cannot remove the epoch %d from 
%s-%s because the partition is " +
-                                    "still owned at a different epoch %d", 
expectedEpoch, topicId, partitionId, prevValue));
+                        Integer prevValue = partitionsOrNull.get(partitionId);
+                        if (prevValue != null && prevValue == expectedEpoch) {
+                            partitionsOrNull.remove(partitionId);
+                        } else {
+                            log.debug("[GroupId {}] Cannot remove the epoch {} 
from {}-{} because the partition is " +
+                                    "still owned at a different epoch {}", 
groupId, expectedEpoch, topicId, partitionId, prevValue);
                         }
                     });
                     if (partitionsOrNull.isEmpty()) {
@@ -1064,9 +1074,9 @@ public class ConsumerGroup extends 
ModernGroup<ConsumerGroupMember> {
                         return partitionsOrNull;
                     }
                 } else {
-                    throw new IllegalStateException(
-                        String.format("Cannot remove the epoch %d from %s 
because it does not have any epoch",
-                            expectedEpoch, topicId));
+                    log.debug("[GroupId {}] Cannot remove the epoch {} from {} 
because it does not have any epoch",
+                            groupId, expectedEpoch, topicId);
+                    return partitionsOrNull;
                 }
             });
         });
@@ -1077,7 +1087,7 @@ public class ConsumerGroup extends 
ModernGroup<ConsumerGroupMember> {
      *
      * @param assignment    The assignment.
      * @param epoch         The new epoch.
-     * @throws IllegalStateException if the partition already has an epoch 
assigned.
+     * @throws IllegalStateException if updating a partition with a smaller or 
equal epoch.
      * package-private for testing.
      */
     void addPartitionEpochs(
@@ -1090,8 +1100,10 @@ public class ConsumerGroup extends 
ModernGroup<ConsumerGroupMember> {
                     partitionsOrNull = new TimelineHashMap<>(snapshotRegistry, 
assignedPartitions.size());
                 }
                 for (Integer partitionId : assignedPartitions) {
-                    Integer prevValue = partitionsOrNull.put(partitionId, 
epoch);
-                    if (prevValue != null) {
+                    Integer prevValue = partitionsOrNull.get(partitionId);
+                    if (prevValue == null || prevValue < epoch) {
+                        partitionsOrNull.put(partitionId, epoch);
+                    } else {
                         throw new IllegalStateException(
                             String.format("Cannot set the epoch of %s-%s to %d 
because the partition is " +
                                 "still owned at epoch %d", topicId, 
partitionId, epoch, prevValue));
@@ -1127,6 +1139,7 @@ public class ConsumerGroup extends 
ModernGroup<ConsumerGroupMember> {
     /**
      * Create a new consumer group according to the given classic group.
      *
+     * @param logContext        The log context.
      * @param snapshotRegistry  The SnapshotRegistry.
      * @param metrics           The GroupCoordinatorMetricsShard.
      * @param classicGroup      The converted classic group.
@@ -1138,6 +1151,7 @@ public class ConsumerGroup extends 
ModernGroup<ConsumerGroupMember> {
      * @throws UnsupportedVersionException if userData from a custom assignor 
would be lost.
      */
     public static ConsumerGroup fromClassicGroup(
+        LogContext logContext,
         SnapshotRegistry snapshotRegistry,
         GroupCoordinatorMetricsShard metrics,
         ClassicGroup classicGroup,
@@ -1145,7 +1159,7 @@ public class ConsumerGroup extends 
ModernGroup<ConsumerGroupMember> {
         ClusterImage clusterImage
     ) {
         String groupId = classicGroup.groupId();
-        ConsumerGroup consumerGroup = new ConsumerGroup(snapshotRegistry, 
groupId, metrics);
+        ConsumerGroup consumerGroup = new ConsumerGroup(logContext, 
snapshotRegistry, groupId, metrics);
         consumerGroup.setGroupEpoch(classicGroup.generationId());
         consumerGroup.setTargetAssignmentEpoch(classicGroup.generationId());
 
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
index 6b150d09c4f..ea73b44590c 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
@@ -16848,6 +16848,110 @@ public class GroupMetadataManagerTest {
         );
     }
 
+    @Test
+    public void testReplayConsumerGroupCurrentMemberAssignmentWithCompaction() 
{
+        String groupId = "fooup";
+        String memberIdA = "memberIdA";
+        String memberIdB = "memberIdB";
+        Uuid topicId = Uuid.randomUuid();
+
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder().build();
+
+        // This test enacts the following scenario:
+        // 1. Member A is assigned partition 0.
+        // 2. Member A is unassigned partition 0 [record removed by 
compaction].
+        // 3. Member B is assigned partition 0. 
+        // 4. Member A is assigned partition 1. 
+        // If record 2 is processed, there are no issues, however with 
compaction it is possible that 
+        // unassignment records are removed. We would like to not fail in 
these cases.
+        // Therefore we will allow assignments to owned partitions as long as 
the epoch is larger. 
+
+        
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 new ConsumerGroupMember.Builder(memberIdA)
+            .setState(MemberState.STABLE)
+            .setMemberEpoch(11)
+            .setPreviousMemberEpoch(10)
+            .setAssignedPartitions(mkAssignment(mkTopicAssignment(topicId, 0)))
+            .build()));
+
+        // Partition 0's owner is replaced by member B at epoch 12.
+        
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 new ConsumerGroupMember.Builder(memberIdB)
+            .setState(MemberState.STABLE)
+            .setMemberEpoch(12)
+            .setPreviousMemberEpoch(11)
+            .setAssignedPartitions(mkAssignment(mkTopicAssignment(topicId, 0)))
+            .build()));
+
+        // Partition 0 must remain with member B at epoch 12 even though 
member A has just been unassigned partition 0.
+        
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 new ConsumerGroupMember.Builder(memberIdA)
+            .setState(MemberState.STABLE)
+            .setMemberEpoch(13)
+            .setPreviousMemberEpoch(12)
+            .setAssignedPartitions(mkAssignment(mkTopicAssignment(topicId, 1)))
+            .build()));
+
+        // Verify partition epochs.
+        ConsumerGroup group = 
context.groupMetadataManager.consumerGroup(groupId);
+        assertEquals(12, group.currentPartitionEpoch(topicId, 0));
+        assertEquals(13, group.currentPartitionEpoch(topicId, 1));
+    }
+
+    @Test
+    public void 
testReplayConsumerGroupCurrentMemberAssignmentUnownedTopicWithCompaction() {
+        String groupId = "fooup";
+        String memberIdA = "memberIdA";
+        String memberIdB = "memberIdB";
+        Uuid fooTopicId = Uuid.randomUuid();
+        Uuid barTopicId = Uuid.randomUuid();
+
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder().build();
+
+        // This test enacts the following scenario:
+        // 1. Member A is assigned partition foo-0.
+        // 2. Member A is unassigned partition foo-0 [record removed by 
compaction].
+        // 3. Member B is assigned partition foo-0.
+        // 4. Member B is unassigned partition foo-0. 
+        // 5. Member A is assigned partition bar-0. 
+        // This is a legitimate set of assignments but with compaction the 
unassignment record can be skipped.
+        // This can lead to conflicts from updating an owned partition in step 
3 and attempting 
+        // to remove nonexistent ownership in step 5. We want to ensure 
removing ownership from a 
+        // completely unowned partition in step 5 is allowed.  
+
+        
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 new ConsumerGroupMember.Builder(memberIdA)
+            .setState(MemberState.STABLE)
+            .setMemberEpoch(11)
+            .setPreviousMemberEpoch(10)
+            .setAssignedPartitions(mkAssignment(mkTopicAssignment(fooTopicId, 
0)))
+            .build()));
+
+        // foo-0's owner is replaced by member B at epoch 12.
+        
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 new ConsumerGroupMember.Builder(memberIdB)
+            .setState(MemberState.STABLE)
+            .setMemberEpoch(12)
+            .setPreviousMemberEpoch(11)
+            .setAssignedPartitions(mkAssignment(mkTopicAssignment(fooTopicId, 
0)))
+            .build()));
+
+        // foo becomes unowned.
+        
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 new ConsumerGroupMember.Builder(memberIdB)
+            .setState(MemberState.STABLE)
+            .setMemberEpoch(13)
+            .setPreviousMemberEpoch(12)
+            .build()));
+
+        // Member A is unassigned foo-0.
+        
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 new ConsumerGroupMember.Builder(memberIdA)
+            .setState(MemberState.STABLE)
+            .setMemberEpoch(14)
+            .setPreviousMemberEpoch(13)
+            .setAssignedPartitions(mkAssignment(mkTopicAssignment(barTopicId, 
0)))
+            .build()));
+
+        // Verify foo-0 is unowned and bar-0 is owned by member A at epoch 14.
+        ConsumerGroup group = 
context.groupMetadataManager.consumerGroup(groupId);
+        assertEquals(-1, group.currentPartitionEpoch(fooTopicId, 0));
+        assertEquals(14, group.currentPartitionEpoch(barTopicId, 0));
+    }
+
     private static void checkJoinGroupResponse(
         JoinGroupResponseData expectedResponse,
         JoinGroupResponseData actualResponse,
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/classic/ClassicGroupTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/classic/ClassicGroupTest.java
index ba27e555b07..6705fea7d10 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/classic/ClassicGroupTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/classic/ClassicGroupTest.java
@@ -1383,6 +1383,7 @@ public class ClassicGroupTest {
             .build();
 
         ConsumerGroup consumerGroup = new ConsumerGroup(
+            logContext,
             new SnapshotRegistry(logContext),
             groupId,
             mock(GroupCoordinatorMetricsShard.class)
@@ -1536,6 +1537,7 @@ public class ClassicGroupTest {
             .build();
 
         ConsumerGroup consumerGroup = new ConsumerGroup(
+            logContext,
             new SnapshotRegistry(logContext),
             groupId,
             mock(GroupCoordinatorMetricsShard.class)
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java
index 8153a84d397..66525468fe9 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java
@@ -87,6 +87,7 @@ public class ConsumerGroupTest {
     private ConsumerGroup createConsumerGroup(String groupId) {
         SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new 
LogContext());
         return new ConsumerGroup(
+            new LogContext(),
             snapshotRegistry,
             groupId,
             mock(GroupCoordinatorMetricsShard.class)
@@ -282,14 +283,26 @@ public class ConsumerGroupTest {
         consumerGroup.updateMember(m1);
 
         ConsumerGroupMember m2 = new ConsumerGroupMember.Builder("m2")
+            .setMemberEpoch(11)
+            .setAssignedPartitions(mkAssignment(
+                mkTopicAssignment(fooTopicId, 1)))
+            .build();
+
+        // m2 can acquire foo-1 because the epoch is larger than m1's epoch.
+        // This should not throw IllegalStateException.
+        consumerGroup.updateMember(m2);
+
+        ConsumerGroupMember m3 = new ConsumerGroupMember.Builder("m3")
             .setMemberEpoch(10)
             .setAssignedPartitions(mkAssignment(
                 mkTopicAssignment(fooTopicId, 1)))
             .build();
 
-        // m2 should not be able to acquire foo-1 because the partition is
-        // still owned by another member.
-        assertThrows(IllegalStateException.class, () -> 
consumerGroup.updateMember(m2));
+        // m3 should not be able to acquire foo-1 because the epoch is smaller 
+        // than the current partition epoch (11).
+        assertThrows(IllegalStateException.class, () -> {
+            consumerGroup.updateMember(m3);
+        });
     }
 
     @Test
@@ -297,13 +310,13 @@ public class ConsumerGroupTest {
         Uuid fooTopicId = Uuid.randomUuid();
         ConsumerGroup consumerGroup = createConsumerGroup("foo");
 
-        // Removing should fail because there is no epoch set.
-        assertThrows(IllegalStateException.class, () -> 
consumerGroup.removePartitionEpochs(
+        // Removing should be a no-op when there is no epoch set.
+        consumerGroup.removePartitionEpochs(
             mkAssignment(
                 mkTopicAssignment(fooTopicId, 1)
             ),
             10
-        ));
+        );
 
         ConsumerGroupMember m1 = new ConsumerGroupMember.Builder("m1")
             .setMemberEpoch(10)
@@ -313,13 +326,15 @@ public class ConsumerGroupTest {
 
         consumerGroup.updateMember(m1);
 
-        // Removing should fail because the expected epoch is incorrect.
-        assertThrows(IllegalStateException.class, () -> 
consumerGroup.removePartitionEpochs(
+        // Removing with incorrect epoch should do nothing. 
+        // A debug message is logged, no exception is thrown.
+        consumerGroup.removePartitionEpochs(
             mkAssignment(
                 mkTopicAssignment(fooTopicId, 1)
             ),
             11
-        ));
+        );
+        assertEquals(10, consumerGroup.currentPartitionEpoch(fooTopicId, 1));
     }
 
     @Test
@@ -334,14 +349,24 @@ public class ConsumerGroupTest {
             10
         );
 
-        // Changing the epoch should fail because the owner of the partition
-        // should remove it first.
-        assertThrows(IllegalStateException.class, () -> 
consumerGroup.addPartitionEpochs(
+        // Updating to a larger epoch should succeed.
+        consumerGroup.addPartitionEpochs(
             mkAssignment(
                 mkTopicAssignment(fooTopicId, 1)
             ),
             11
-        ));
+        );
+        assertEquals(11, consumerGroup.currentPartitionEpoch(fooTopicId, 1));
+
+        // Updating to a smaller epoch should fail.
+        assertThrows(IllegalStateException.class, () -> {
+            consumerGroup.addPartitionEpochs(
+                mkAssignment(
+                    mkTopicAssignment(fooTopicId, 1)
+                ),
+                10
+            );
+        });
     }
 
     @Test
@@ -904,7 +929,7 @@ public class ConsumerGroupTest {
     public void testUpdateInvertedAssignment() {
         SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new 
LogContext());
         GroupCoordinatorMetricsShard metricsShard = 
mock(GroupCoordinatorMetricsShard.class);
-        ConsumerGroup consumerGroup = new ConsumerGroup(snapshotRegistry, 
"test-group", metricsShard);
+        ConsumerGroup consumerGroup = new ConsumerGroup(new LogContext(), 
snapshotRegistry, "test-group", metricsShard);
         Uuid topicId = Uuid.randomUuid();
         String memberId1 = "member1";
         String memberId2 = "member2";
@@ -1124,7 +1149,7 @@ public class ConsumerGroupTest {
             Collections.emptyMap(),
             new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)
         );
-        ConsumerGroup group = new ConsumerGroup(snapshotRegistry, "group-foo", 
metricsShard);
+        ConsumerGroup group = new ConsumerGroup(new LogContext(), 
snapshotRegistry, "group-foo", metricsShard);
         snapshotRegistry.idempotentCreateSnapshot(0);
         assertEquals(ConsumerGroup.ConsumerGroupState.EMPTY.toString(), 
group.stateAsString(0));
         group.updateMember(new ConsumerGroupMember.Builder("member1")
@@ -1139,6 +1164,7 @@ public class ConsumerGroupTest {
     public void testValidateOffsetFetch() {
         SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new 
LogContext());
         ConsumerGroup group = new ConsumerGroup(
+            new LogContext(), 
             snapshotRegistry,
             "group-foo",
             mock(GroupCoordinatorMetricsShard.class)
@@ -1200,7 +1226,7 @@ public class ConsumerGroupTest {
         long commitTimestamp = 20000L;
         long offsetsRetentionMs = 10000L;
         OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(15000L, 
OptionalInt.empty(), "", commitTimestamp, OptionalLong.empty());
-        ConsumerGroup group = new ConsumerGroup(new SnapshotRegistry(new 
LogContext()), "group-id", mock(GroupCoordinatorMetricsShard.class));
+        ConsumerGroup group = new ConsumerGroup(new LogContext(), new 
SnapshotRegistry(new LogContext()), "group-id", 
mock(GroupCoordinatorMetricsShard.class));
 
         Optional<OffsetExpirationCondition> offsetExpirationCondition = 
group.offsetExpirationCondition();
         assertTrue(offsetExpirationCondition.isPresent());
@@ -1258,7 +1284,7 @@ public class ConsumerGroupTest {
     @Test
     public void testAsDescribedGroup() {
         SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new 
LogContext());
-        ConsumerGroup group = new ConsumerGroup(snapshotRegistry, 
"group-id-1", mock(GroupCoordinatorMetricsShard.class));
+        ConsumerGroup group = new ConsumerGroup(new LogContext(), 
snapshotRegistry, "group-id-1", mock(GroupCoordinatorMetricsShard.class));
         snapshotRegistry.idempotentCreateSnapshot(0);
         assertEquals(ConsumerGroup.ConsumerGroupState.EMPTY.toString(), 
group.stateAsString(0));
 
@@ -1300,7 +1326,7 @@ public class ConsumerGroupTest {
             Collections.emptyMap(),
             new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)
         );
-        ConsumerGroup group = new ConsumerGroup(snapshotRegistry, "group-foo", 
metricsShard);
+        ConsumerGroup group = new ConsumerGroup(new LogContext(), 
snapshotRegistry, "group-foo", metricsShard);
         snapshotRegistry.idempotentCreateSnapshot(0);
         assertTrue(group.isInStates(Collections.singleton("empty"), 0));
         assertFalse(group.isInStates(Collections.singleton("Empty"), 0));
@@ -1530,6 +1556,7 @@ public class ConsumerGroupTest {
         classicGroup.add(member);
 
         ConsumerGroup consumerGroup = ConsumerGroup.fromClassicGroup(
+            logContext,
             new SnapshotRegistry(logContext),
             mock(GroupCoordinatorMetricsShard.class),
             classicGroup,
@@ -1538,6 +1565,7 @@ public class ConsumerGroupTest {
         );
 
         ConsumerGroup expectedConsumerGroup = new ConsumerGroup(
+            new LogContext(), 
             new SnapshotRegistry(logContext),
             groupId,
             mock(GroupCoordinatorMetricsShard.class)

Reply via email to