This is an automated email from the ASF dual-hosted git repository. lucasbru pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new f621a635c15 KAFKA-19570: Implement offline migration for streams groups (#20288) f621a635c15 is described below commit f621a635c1546246b80ae5d4624241aa4b58fd39 Author: Lucas Brutschy <lbruts...@confluent.io> AuthorDate: Tue Aug 26 10:05:30 2025 +0200 KAFKA-19570: Implement offline migration for streams groups (#20288) Offline migration essentially preserves offsets and nothing else. So effectively write tombstones for classic group type when a streams heartbeat is sent to with the group ID of an empty classic group, and write tombstones for the streams group type when a classic consumer attempts to join with a group ID of an empty streams group. Reviewers: Bill Bejeck <bbej...@apache.org>, Sean Quah <sq...@confluent.io>, Dongnuo Lyu <d...@confluent.io> --- .../coordinator/group/GroupMetadataManager.java | 48 ++++++- .../group/GroupMetadataManagerTest.java | 150 +++++++++++++++++++++ 2 files changed, 195 insertions(+), 3 deletions(-) diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java index 44380427284..f87af4897a7 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java @@ -833,19 +833,28 @@ public class GroupMetadataManager { * Gets or creates a streams group without updating the groups map. * The group will be materialized during the replay. * + * If there is an empty classic consumer group of the same name, it will be deleted and a new streams + * group will be created. + * * @param groupId The group ID. + * @param records The record list to which the group tombstones are written + * if the group is empty and is a classic group. * * @return A StreamsGroup. * * Package private for testing. */ StreamsGroup getOrCreateStreamsGroup( - String groupId + String groupId, + List<CoordinatorRecord> records ) { Group group = groups.get(groupId); if (group == null) { return new StreamsGroup(logContext, snapshotRegistry, groupId, metrics); + } else if (maybeDeleteEmptyClassicGroup(group, records)) { + log.info("[GroupId {}] Converted the empty classic group to a streams group.", groupId); + return new StreamsGroup(logContext, snapshotRegistry, groupId, metrics); } else { return castToStreamsGroup(group); } @@ -1871,7 +1880,7 @@ public class GroupMetadataManager { boolean isJoining = memberEpoch == 0; StreamsGroup group; if (isJoining) { - group = getOrCreateStreamsGroup(groupId); + group = getOrCreateStreamsGroup(groupId, records); throwIfStreamsGroupIsFull(group); } else { group = getStreamsGroupOrThrow(groupId); @@ -6066,7 +6075,11 @@ public class GroupMetadataManager { // classicGroupJoinToConsumerGroup takes the join requests to non-empty consumer groups. // The empty consumer groups should be converted to classic groups in classicGroupJoinToClassicGroup. return classicGroupJoinToConsumerGroup((ConsumerGroup) group, context, request, responseFuture); - } else if (group.type() == CONSUMER || group.type() == CLASSIC) { + } else if (group.type() == CONSUMER || group.type() == CLASSIC || group.type() == STREAMS && group.isEmpty()) { + // classicGroupJoinToClassicGroup accepts: + // - classic groups + // - empty streams groups + // - empty consumer groups return classicGroupJoinToClassicGroup(context, request, responseFuture); } else { // Group exists but it's not a consumer group @@ -6107,6 +6120,8 @@ public class GroupMetadataManager { ClassicGroup group; if (maybeDeleteEmptyConsumerGroup(groupId, records)) { log.info("[GroupId {}] Converted the empty consumer group to a classic group.", groupId); + } else if (maybeDeleteEmptyStreamsGroup(groupId, records)) { + log.info("[GroupId {}] Converted the empty streams group to a classic group.", groupId); } boolean isNewGroup = !groups.containsKey(groupId); try { @@ -8398,6 +8413,13 @@ public class GroupMetadataManager { return group != null && group.type() == CONSUMER && group.isEmpty(); } + /** + * @return true if the group is an empty streams group. + */ + private static boolean isEmptyStreamsGroup(Group group) { + return group != null && group.type() == STREAMS && group.isEmpty(); + } + /** * Write tombstones for the group if it's empty and is a classic group. * @@ -8435,6 +8457,26 @@ public class GroupMetadataManager { } return false; } + + /** + * Delete and write tombstones for the group if it's empty and is a streams group. + * + * @param groupId The group id to be deleted. + * @param records The list of records to delete the group. + * + * @return true if the group is an empty streams group. + */ + private boolean maybeDeleteEmptyStreamsGroup(String groupId, List<CoordinatorRecord> records) { + Group group = groups.get(groupId, Long.MAX_VALUE); + if (isEmptyStreamsGroup(group)) { + // Add tombstones for the previous streams group. The tombstones won't actually be + // replayed because its coordinator result has a non-null appendFuture. + createGroupTombstoneRecords(group, records); + removeGroup(groupId); + return true; + } + return false; + } /** * Checks whether the given protocol type or name in the request is inconsistent with the group's. diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java index 3f1e49b955d..efe2ad96435 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java @@ -18633,6 +18633,156 @@ public class GroupMetadataManagerTest { assertNull(result.response().data().partitionsByUserEndpoint()); } + @Test + public void testStreamsGroupHeartbeatWithNonEmptyClassicGroup() { + String classicGroupId = "classic-group-id"; + String memberId = Uuid.randomUuid().toString(); + + String subtopology1 = "subtopology1"; + String fooTopicName = "foo"; + Topology topology = new Topology().setSubtopologies(List.of( + new Subtopology().setSubtopologyId(subtopology1).setSourceTopics(List.of(fooTopicName)) + )); + + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder().build(); + ClassicGroup classicGroup = new ClassicGroup( + new LogContext(), + classicGroupId, + EMPTY, + context.time + ); + context.replay(GroupCoordinatorRecordHelpers.newGroupMetadataRecord(classicGroup, classicGroup.groupAssignment())); + + context.groupMetadataManager.getOrMaybeCreateClassicGroup(classicGroupId, false).transitionTo(PREPARING_REBALANCE); + assertThrows(GroupIdNotFoundException.class, () -> + context.streamsGroupHeartbeat( + new StreamsGroupHeartbeatRequestData() + .setGroupId(classicGroupId) + .setMemberId(memberId) + .setMemberEpoch(0) + .setRebalanceTimeoutMs(12000) + .setTopology(topology) + .setActiveTasks(List.of()) + .setStandbyTasks(List.of()) + .setWarmupTasks(List.of()))); + } + + @Test + public void testStreamsGroupHeartbeatWithEmptyClassicGroup() { + String classicGroupId = "classic-group-id"; + String memberId = Uuid.randomUuid().toString(); + String fooTopicName = "foo"; + String subtopology1 = "subtopology1"; + Topology topology = new Topology().setSubtopologies(List.of( + new Subtopology().setSubtopologyId(subtopology1).setSourceTopics(List.of(fooTopicName)) + )); + + MockTaskAssignor assignor = new MockTaskAssignor("sticky"); + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withStreamsGroupTaskAssignors(List.of(assignor)) + .build(); + ClassicGroup classicGroup = new ClassicGroup( + new LogContext(), + classicGroupId, + EMPTY, + context.time + ); + context.replay(GroupCoordinatorRecordHelpers.newGroupMetadataRecord(classicGroup, classicGroup.groupAssignment())); + + CoordinatorResult<StreamsGroupHeartbeatResult, CoordinatorRecord> result = context.streamsGroupHeartbeat( + new StreamsGroupHeartbeatRequestData() + .setGroupId(classicGroupId) + .setMemberId(memberId) + .setMemberEpoch(0) + .setRebalanceTimeoutMs(12000) + .setTopology(topology) + .setActiveTasks(List.of()) + .setStandbyTasks(List.of()) + .setWarmupTasks(List.of())); + + StreamsGroupMember expectedMember = StreamsGroupMember.Builder.withDefaults(memberId) + .setState(org.apache.kafka.coordinator.group.streams.MemberState.STABLE) + .setMemberEpoch(1) + .setPreviousMemberEpoch(0) + .setRebalanceTimeoutMs(5000) + .setClientId(DEFAULT_CLIENT_ID) + .setClientHost(DEFAULT_CLIENT_ADDRESS.toString()) + .setAssignedTasks(TasksTuple.EMPTY) + .setTasksPendingRevocation(TasksTuple.EMPTY) + .setRebalanceTimeoutMs(12000) + .setTopologyEpoch(0) + .build(); + + assertEquals(Errors.NONE.code(), result.response().data().errorCode()); + assertEquals( + List.of( + GroupCoordinatorRecordHelpers.newGroupMetadataTombstoneRecord(classicGroupId), + StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord(classicGroupId, expectedMember), + StreamsCoordinatorRecordHelpers.newStreamsGroupTopologyRecord(classicGroupId, topology), + StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(classicGroupId, 1, 0), + StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(classicGroupId, memberId, TasksTuple.EMPTY), + StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentEpochRecord(classicGroupId, 1), + StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(classicGroupId, expectedMember) + ), + result.records() + ); + assertEquals( + Group.GroupType.STREAMS, + context.groupMetadataManager.streamsGroup(classicGroupId).type() + ); + } + + @Test + public void testClassicGroupJoinWithEmptyStreamsGroup() throws Exception { + String streamsGroupId = "streams-group-id"; + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withStreamsGroup(new StreamsGroupBuilder(streamsGroupId, 10)) + .build(); + + JoinGroupRequestData request = new GroupMetadataManagerTestContext.JoinGroupRequestBuilder() + .withGroupId(streamsGroupId) + .withMemberId(UNKNOWN_MEMBER_ID) + .withDefaultProtocolTypeAndProtocols() + .build(); + GroupMetadataManagerTestContext.JoinResult joinResult = context.sendClassicGroupJoin(request, true); + + List<CoordinatorRecord> expectedRecords = List.of( + StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentEpochTombstoneRecord(streamsGroupId), + StreamsCoordinatorRecordHelpers.newStreamsGroupEpochTombstoneRecord(streamsGroupId), + StreamsCoordinatorRecordHelpers.newStreamsGroupTopologyRecordTombstone(streamsGroupId) + ); + + assertEquals(Errors.MEMBER_ID_REQUIRED.code(), joinResult.joinFuture.get().errorCode()); + assertEquals(expectedRecords, joinResult.records.subList(0, expectedRecords.size())); + assertEquals( + Group.GroupType.CLASSIC, + context.groupMetadataManager.getOrMaybeCreateClassicGroup(streamsGroupId, false).type() + ); + } + + @Test + public void testClassicGroupJoinWithNonEmptyStreamsGroup() throws Exception { + String streamsGroupId = "streams-group-id"; + String memberId = Uuid.randomUuid().toString(); + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withStreamsGroup(new StreamsGroupBuilder(streamsGroupId, 10) + .withMember(StreamsGroupMember.Builder.withDefaults(memberId) + .setState(org.apache.kafka.coordinator.group.streams.MemberState.STABLE) + .setMemberEpoch(10) + .setPreviousMemberEpoch(10) + .build())) + .build(); + + JoinGroupRequestData request = new GroupMetadataManagerTestContext.JoinGroupRequestBuilder() + .withGroupId(streamsGroupId) + .withMemberId(UNKNOWN_MEMBER_ID) + .withDefaultProtocolTypeAndProtocols() + .build(); + + GroupMetadataManagerTestContext.JoinResult joinResult = context.sendClassicGroupJoin(request); + assertEquals(Errors.INCONSISTENT_GROUP_PROTOCOL.code(), joinResult.joinFuture.get().errorCode()); + } + @Test public void testConsumerGroupDynamicConfigs() { String groupId = "fooup";