This is an automated email from the ASF dual-hosted git repository. cadonna pushed a commit to branch kip1071 in repository https://gitbox.apache.org/repos/asf/kafka.git
commit 7a9105b93fc6bfc829e52dfd24eb5ea1302c99b6 Author: Bruno Cadonna <[email protected]> AuthorDate: Mon Sep 2 11:18:19 2024 +0200 Improve the Streams group initialization handler This commit: - schedules a timeout for the initialization call - requests the initialization only from the one member - intializations of unknown topologies or already initialized topologies are silently dropped --- .../internals/StreamsAssignmentInterface.java | 3 + .../StreamsGroupHeartbeatRequestManager.java | 4 + .../StreamsGroupInitializeRequestManager.java | 3 +- .../message/StreamsGroupInitializeRequest.json | 4 +- .../StreamsGroupInitializeRequestManagerTest.java | 2 +- .../common/runtime/CoordinatorRuntime.java | 5 + .../common/runtime/CoordinatorTimer.java | 9 + .../common/runtime/MockCoordinatorTimer.java | 3 +- .../coordinator/group/GroupMetadataManager.java | 141 ++++- .../streams/CoordinatorStreamsRecordHelpers.java | 14 +- .../coordinator/group/streams/StreamsGroup.java | 54 +- .../coordinator/group/streams/StreamsTopology.java | 38 +- .../group/streams/TargetAssignmentBuilder.java | 6 +- .../group/GroupCoordinatorShardTest.java | 10 +- .../group/GroupMetadataManagerTest.java | 590 ++++++++++++++++----- .../group/GroupMetadataManagerTestContext.java | 9 + .../CoordinatorStreamsRecordHelpersTest.java | 2 +- .../group/streams/StreamsGroupBuilder.java | 6 +- .../group/streams/StreamsGroupTest.java | 31 +- .../group/streams/TargetAssignmentBuilderTest.java | 46 +- 20 files changed, 743 insertions(+), 237 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsAssignmentInterface.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsAssignmentInterface.java index 5793ffec2aa..567fcb5f776 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsAssignmentInterface.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsAssignmentInterface.java @@ -73,6 +73,9 @@ public class StreamsAssignmentInterface { return assignmentConfiguration; } + // ToDo: As long as we do not compute the topology ID, let's use a constant one + public final String topologyId = "topology-id"; + // TODO: This needs to be used somewhere public Map<TaskId, Long> taskLags() { return taskLags; diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java index ba0a289f187..e93b0e51c10 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java @@ -516,6 +516,7 @@ public class StreamsGroupHeartbeatRequestManager implements RequestManager { final StreamsAssignmentInterface streamsInterface, final ConsumerMembershipManager membershipManager, final int rebalanceTimeoutMs) { + this.membershipManager = membershipManager; this.rebalanceTimeoutMs = rebalanceTimeoutMs; this.sentFields = new StreamsGroupHeartbeatRequestManager.HeartbeatState.SentFields(); @@ -532,6 +533,9 @@ public class StreamsGroupHeartbeatRequestManager implements RequestManager { // GroupId - always sent data.setGroupId(membershipManager.groupId()); + // TopologyId - always sent + data.setTopologyId(streamsInterface.topologyId); + // MemberId - always sent, empty until it has been received from the coordinator data.setMemberId(membershipManager.memberId()); diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupInitializeRequestManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupInitializeRequestManager.java index e7f763bd969..0a6a67594c1 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupInitializeRequestManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupInitializeRequestManager.java @@ -68,6 +68,7 @@ public class StreamsGroupInitializeRequestManager implements RequestManager { private NetworkClientDelegate.UnsentRequest makeRequest() { final StreamsGroupInitializeRequestData streamsGroupInitializeRequestData = new StreamsGroupInitializeRequestData(); streamsGroupInitializeRequestData.setGroupId(groupId); + streamsGroupInitializeRequestData.setTopologyId(streamsAssignmentInterface.topologyId); final List<StreamsGroupInitializeRequestData.Subtopology> topology = getTopologyFromStreams(); streamsGroupInitializeRequestData.setTopology(topology); final StreamsGroupInitializeRequest.Builder streamsGroupInitializeRequestBuilder = new StreamsGroupInitializeRequest.Builder( @@ -91,7 +92,7 @@ public class StreamsGroupInitializeRequestManager implements RequestManager { private static StreamsGroupInitializeRequestData.Subtopology getSubtopologyFromStreams(final String subtopologyName, final StreamsAssignmentInterface.Subtopology subtopology) { final StreamsGroupInitializeRequestData.Subtopology subtopologyData = new StreamsGroupInitializeRequestData.Subtopology(); - subtopologyData.setSubtopology(subtopologyName); + subtopologyData.setSubtopologyId(subtopologyName); subtopologyData.setSourceTopics(new ArrayList<>(subtopology.sourceTopics)); subtopologyData.setRepartitionSinkTopics(new ArrayList<>(subtopology.sinkTopics)); subtopologyData.setRepartitionSourceTopics(getRepartitionTopicsInfoFromStreams(subtopology)); diff --git a/clients/src/main/resources/common/message/StreamsGroupInitializeRequest.json b/clients/src/main/resources/common/message/StreamsGroupInitializeRequest.json index 602b557e3e9..3223d1e8d9e 100644 --- a/clients/src/main/resources/common/message/StreamsGroupInitializeRequest.json +++ b/clients/src/main/resources/common/message/StreamsGroupInitializeRequest.json @@ -23,10 +23,12 @@ "fields": [ { "name": "GroupId", "type": "string", "versions": "0+", "entityType": "groupId", "about": "The group identifier." }, + { "name": "TopologyId", "type": "string", "versions": "0+", + "about": "The ID of the topology." }, { "name": "Topology", "type": "[]Subtopology", "versions": "0+", "about": "The sub-topologies of the streams application.", "fields": [ - { "name": "Subtopology", "type": "string", "versions": "0+", + { "name": "SubtopologyId", "type": "string", "versions": "0+", "about": "String to uniquely identify the sub-topology. Deterministically generated from the topology" }, { "name": "SourceTopics", "type": "[]string", "versions": "0+", "about": "The topics the topology reads from." }, diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupInitializeRequestManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupInitializeRequestManagerTest.java index 48108204474..ebba584b0db 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupInitializeRequestManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupInitializeRequestManagerTest.java @@ -114,7 +114,7 @@ class StreamsGroupInitializeRequestManagerTest { final List<StreamsGroupInitializeRequestData.Subtopology> subtopologies = streamsGroupInitializeRequestData.topology(); assertEquals(1, subtopologies.size()); final StreamsGroupInitializeRequestData.Subtopology subtopology = subtopologies.get(0); - assertEquals(subtopologyName1, subtopology.subtopology()); + assertEquals(subtopologyName1, subtopology.subtopologyId()); assertEquals(new ArrayList<>(sourceTopics), subtopology.sourceTopics()); assertEquals(new ArrayList<>(sinkTopics), subtopology.repartitionSinkTopics()); assertEquals(repartitionTopics.size(), subtopology.repartitionSourceTopics().size()); diff --git a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java index bef30a113df..d47bd967368 100644 --- a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java +++ b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java @@ -453,6 +453,11 @@ public class CoordinatorRuntime<S extends CoordinatorShard<U>, U> implements Aut public int size() { return tasks.size(); } + + @Override + public boolean isScheduled(String key) { + return tasks.containsKey(key); + } } /** diff --git a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorTimer.java b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorTimer.java index d10e38a7d82..c6e8456bf2b 100644 --- a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorTimer.java +++ b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorTimer.java @@ -84,4 +84,13 @@ public interface CoordinatorTimer<T, U> { * @param key The key. */ void cancel(String key); + + /** + * Verifies whether an operation corresponding to a given key is scheduled. + * + * @param key The key. + * + * @return {@code true} if the operation is scheduled, {@code false} otherwise. + */ + boolean isScheduled(String key); } diff --git a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/MockCoordinatorTimer.java b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/MockCoordinatorTimer.java index 5c55f59d608..6a17254abe9 100644 --- a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/MockCoordinatorTimer.java +++ b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/MockCoordinatorTimer.java @@ -155,7 +155,8 @@ public class MockCoordinatorTimer<T, U> implements CoordinatorTimer<T, U> { /** * @return True if a timeout with the key exists; false otherwise. */ - public boolean contains(String key) { + @Override + public boolean isScheduled(String key) { return timeoutMap.containsKey(key); } diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java index 40c9e652a1b..bc551a1d89f 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java @@ -216,14 +216,14 @@ import static org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics import static org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.CONSUMER_GROUP_REBALANCES_SENSOR_NAME; import static org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.STREAMS_GROUP_REBALANCES_SENSOR_NAME; import static org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroupMember.hasAssignedPartitionsChanged; -import static org.apache.kafka.coordinator.group.streams.CoordinatorStreamsRecordHelpers.newStreamsCurrentAssignmentRecord; -import static org.apache.kafka.coordinator.group.streams.CoordinatorStreamsRecordHelpers.newStreamsCurrentAssignmentTombstoneRecord; +import static org.apache.kafka.coordinator.group.streams.CoordinatorStreamsRecordHelpers.newStreamsGroupCurrentAssignmentRecord; +import static org.apache.kafka.coordinator.group.streams.CoordinatorStreamsRecordHelpers.newStreamsGroupCurrentAssignmentTombstoneRecord; import static org.apache.kafka.coordinator.group.streams.CoordinatorStreamsRecordHelpers.newStreamsGroupEpochRecord; import static org.apache.kafka.coordinator.group.streams.CoordinatorStreamsRecordHelpers.newStreamsGroupMemberRecord; import static org.apache.kafka.coordinator.group.streams.CoordinatorStreamsRecordHelpers.newStreamsGroupMemberTombstoneRecord; import static org.apache.kafka.coordinator.group.streams.CoordinatorStreamsRecordHelpers.newStreamsGroupPartitionMetadataRecord; import static org.apache.kafka.coordinator.group.streams.CoordinatorStreamsRecordHelpers.newStreamsGroupTopologyRecord; -import static org.apache.kafka.coordinator.group.streams.CoordinatorStreamsRecordHelpers.newStreamsTargetAssignmentTombstoneRecord; +import static org.apache.kafka.coordinator.group.streams.CoordinatorStreamsRecordHelpers.newStreamsGroupTargetAssignmentTombstoneRecord; import static org.apache.kafka.coordinator.group.streams.StreamsGroupMember.hasAssignedActiveTasksChanged; import static org.apache.kafka.coordinator.group.streams.StreamsGroupMember.hasAssignedStandbyTasksChanged; import static org.apache.kafka.coordinator.group.streams.StreamsGroupMember.hasAssignedWarmupTasksChanged; @@ -1158,7 +1158,7 @@ public class GroupMetadataManager { } if (group == null) { - log.info("Reading persisted streams group {}", groupId); + log.info("Creating persisted streams group {}", groupId); StreamsGroup streamsGroup = new StreamsGroup(snapshotRegistry, groupId, metrics); groups.put(groupId, streamsGroup); metrics.onStreamsGroupStateTransition(null, streamsGroup.state()); @@ -2402,7 +2402,10 @@ public class GroupMetadataManager { scheduleStreamsGroupSessionTimeout(groupId, memberId); - if (group.topology() == null) { + boolean requestTopologyInitialization = false; + if (group.topology() == null && !isTopologyInitializationScheduled(groupId, topologyId)) { + requestTopologyInitialization = true; + scheduleStreamsGroupTopologyInitializationTimeout(groupId, topologyId, memberId, rebalanceTimeoutMs); log.info("Asking member {} at {} to initialize the topology", memberId, clientHost); } @@ -2410,7 +2413,7 @@ public class GroupMetadataManager { StreamsGroupHeartbeatResponseData response = new StreamsGroupHeartbeatResponseData() .setMemberId(updatedMember.memberId()) .setMemberEpoch(updatedMember.memberEpoch()) - .setShouldInitializeTopology(group.topology() == null) + .setShouldInitializeTopology(requestTopologyInitialization) .setHeartbeatIntervalMs(streamsGroupHeartbeatIntervalMs); // The assignment is only provided in the following cases: @@ -2654,19 +2657,28 @@ public class GroupMetadataManager { * Handles the initialization of the topology information on the broker side, that will be reused by all members of the group. * * @param groupId The group id from the request. - * @param subtopologies The list of subtopologies + * @param topologyId The topology ID. + * @param subtopologies The list of subtopologies. * @return A Result containing the StreamsGroupInitialize response and a list of records to update the state machine. */ private CoordinatorResult<StreamsGroupInitializeResponseData, CoordinatorRecord> streamsGroupInitialize(String groupId, + String topologyId, List<StreamsGroupInitializeRequestData.Subtopology> subtopologies) throws ApiException { + final List<CoordinatorRecord> records = new ArrayList<>(); - log.info("Initializing topology for group {} to {}", groupId, subtopologies); + log.info("Initializing topology {} for group {} to {}", topologyId, groupId, subtopologies); final StreamsGroup group = getOrMaybeCreateStreamsGroup(groupId, false, records); throwIfNull(group, "group does not exist"); + if (!isTopologyInitializationScheduled(groupId, topologyId)) { + log.warn("No topology to initialize for group ID {} and topology ID {} found.", groupId, topologyId); + StreamsGroupInitializeResponseData response = new StreamsGroupInitializeResponseData(); + return new CoordinatorResult<>(records, response); + } + // TODO: For the POC, only check if internal topics exist Set<String> missingTopics = new HashSet<>(); for (StreamsGroupInitializeRequestData.Subtopology subtopology : subtopologies) { @@ -2681,6 +2693,9 @@ public class GroupMetadataManager { } } } + + cancelStreamsGroupTopologyInitializationTimeout(groupId, topologyId); + if (!missingTopics.isEmpty()) { StreamsGroupInitializeResponseData response = new StreamsGroupInitializeResponseData() @@ -2691,6 +2706,10 @@ public class GroupMetadataManager { } else { records.add(newStreamsGroupTopologyRecord(groupId, subtopologies)); + final StreamsTopology topology = new StreamsTopology(topologyId, subtopologies); + + computeFirstTargetAssignmentAfterTopologyInitialization(group, records, topology); + StreamsGroupInitializeResponseData response = new StreamsGroupInitializeResponseData(); return new CoordinatorResult<>(records, response); @@ -2698,6 +2717,56 @@ public class GroupMetadataManager { } + // ToDo: verify how much code we can share with Streams heartbeat handler + private void computeFirstTargetAssignmentAfterTopologyInitialization(StreamsGroup group, + List<CoordinatorRecord> records, + StreamsTopology topology) { + String groupId = group.groupId(); + final long currentTimeMs = time.milliseconds(); + int groupEpoch = group.groupEpoch(); + + Map<String, org.apache.kafka.coordinator.group.streams.TopicMetadata> subscriptionMetadata = + group.computeSubscriptionMetadata( + metadataImage.topics(), + metadataImage.cluster(), + topology + ); + if (!subscriptionMetadata.equals(group.subscriptionMetadata())) { + log.info("[GroupId {}] Computed new partition metadata: {}.", groupId, subscriptionMetadata); + records.add(newStreamsGroupPartitionMetadataRecord(groupId, subscriptionMetadata)); + } + groupEpoch += 1; + records.add(newStreamsGroupEpochRecord(groupId, groupEpoch)); + log.info("[GroupId {}] Bumped streams group epoch to {}.", groupId, groupEpoch); + metrics.record(STREAMS_GROUP_REBALANCES_SENSOR_NAME); + group.setMetadataRefreshDeadline(currentTimeMs + streamsGroupMetadataRefreshIntervalMs, groupEpoch); + + // TODO: Read the preferred server assignor from the group configuration + String preferredServerAssignor = defaultTaskAssignor.name(); + try { + org.apache.kafka.coordinator.group.streams.TargetAssignmentBuilder assignmentResultBuilder = + new org.apache.kafka.coordinator.group.streams.TargetAssignmentBuilder(group.groupId(), groupEpoch, taskAssignors.get(preferredServerAssignor)) + .withMembers(group.members()) + .withTopology(topology) + .withStaticMembers(group.staticMembers()) + .withSubscriptionMetadata(subscriptionMetadata) + .withTargetAssignment(group.targetAssignment()); + org.apache.kafka.coordinator.group.streams.TargetAssignmentBuilder.TargetAssignmentResult assignmentResult = + assignmentResultBuilder + .build(); + + log.info("[GroupId {}] Computed a new target assignment for epoch {} with '{}' assignor: {}.", + group.groupId(), groupEpoch, preferredServerAssignor, assignmentResult.targetAssignment()); + + records.addAll(assignmentResult.records()); + } catch (PartitionAssignorException ex) { + String msg = String.format("Failed to compute a new target assignment for epoch %d: %s", + groupEpoch, ex.getMessage()); + log.error("[GroupId {}] {}.", group.groupId(), msg); + throw new UnknownServerException(msg, ex); + } + } + /** * Handle a JoinGroupRequest to a ConsumerGroup. * @@ -3692,7 +3761,7 @@ public class GroupMetadataManager { .build(); if (!updatedMember.equals(member)) { - records.add(newStreamsCurrentAssignmentRecord(groupId, updatedMember)); + records.add(newStreamsGroupCurrentAssignmentRecord(groupId, updatedMember)); log.info("[GroupId {}] Member {} new assignment state: epoch={}, previousEpoch={}, state={}, " + "assignedActiveTasks={}, assignedStandbyTasks={}, assignedWarmupTasks={} and revokedPartitions={}.", @@ -4062,7 +4131,7 @@ public class GroupMetadataManager { .build(); return new CoordinatorResult<>( - Collections.singletonList(newStreamsCurrentAssignmentRecord(group.groupId(), leavingStaticMember)), + Collections.singletonList(newStreamsGroupCurrentAssignmentRecord(group.groupId(), leavingStaticMember)), new StreamsGroupHeartbeatResponseData() .setMemberId(member.memberId()) .setMemberEpoch(LEAVE_GROUP_STATIC_MEMBER_EPOCH) @@ -4224,8 +4293,8 @@ public class GroupMetadataManager { * @param memberId The member id. */ private void removeStreamsMember(List<CoordinatorRecord> records, String groupId, String memberId) { - records.add(newStreamsCurrentAssignmentTombstoneRecord(groupId, memberId)); - records.add(newStreamsTargetAssignmentTombstoneRecord(groupId, memberId)); + records.add(newStreamsGroupCurrentAssignmentTombstoneRecord(groupId, memberId)); + records.add(newStreamsGroupTargetAssignmentTombstoneRecord(groupId, memberId)); records.add(newStreamsGroupMemberTombstoneRecord(groupId, memberId)); } @@ -4459,6 +4528,49 @@ public class GroupMetadataManager { } /** + * Schedules (or reschedules) the topology initialisation timeout for the member. + * + * @param groupId The group id. + * @param topologyId The topology id. + * @param memberId The member id. + * @param topologyIntializationTimeout The topology initialization timeout. + */ + private void scheduleStreamsGroupTopologyInitializationTimeout( + String groupId, + String topologyId, + String memberId, + int topologyIntializationTimeout + ) { + timer.schedule( + streamsTopologyInitializationTimeoutKey(groupId, topologyId), + topologyIntializationTimeout, + TimeUnit.MILLISECONDS, + true, + () -> streamsGroupFenceMemberOperation(groupId, memberId, "the timeout for the topology intialization expired.") + ); + } + + /** + * Verifies if the topology initialization timeout is scheduled. + * + * @param groupId The group id. + * @param topologyId The topology id. + * @return {@code true} if the topology initialization timeout is scheduled, {@code false} otherwise. + */ + private boolean isTopologyInitializationScheduled(String groupId, String topologyId) { + return timer.isScheduled(streamsTopologyInitializationTimeoutKey(groupId, topologyId)); + } + + /** + * Cancels the topology initialization timeout. + * + * @param groupId The group id. + * @param topologyId The topology id. + */ + private void cancelStreamsGroupTopologyInitializationTimeout(String groupId, String topologyId) { + timer.cancel(streamsTopologyInitializationTimeoutKey(groupId, topologyId)); + } + /** * Cancels the session timeout of the member. * @@ -4793,6 +4905,7 @@ public class GroupMetadataManager { return streamsGroupInitialize( request.groupId(), + request.topologyId(), request.topology() ); } @@ -5824,6 +5937,10 @@ public class GroupMetadataManager { return "rebalance-timeout-" + groupId + "-" + memberId; } + public static String streamsTopologyInitializationTimeoutKey(String groupId, String topologyId) { + return "topology-initialization-timeout-" + groupId + "-" + topologyId; + } + /** * Replays GroupMetadataKey/Value to update the soft state of * the classic group. diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/CoordinatorStreamsRecordHelpers.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/CoordinatorStreamsRecordHelpers.java index 2f48151e137..0e615db1c14 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/CoordinatorStreamsRecordHelpers.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/CoordinatorStreamsRecordHelpers.java @@ -202,7 +202,7 @@ public class CoordinatorStreamsRecordHelpers { ); } - public static CoordinatorRecord newStreamsTargetAssignmentRecord( + public static CoordinatorRecord newStreamsGroupTargetAssignmentRecord( String groupId, String memberId, Map<String, Set<Integer>> activeTasks, @@ -258,7 +258,7 @@ public class CoordinatorStreamsRecordHelpers { * @param memberId The streams group member id. * @return The record. */ - public static CoordinatorRecord newStreamsTargetAssignmentTombstoneRecord( + public static CoordinatorRecord newStreamsGroupTargetAssignmentTombstoneRecord( String groupId, String memberId ) { @@ -274,7 +274,7 @@ public class CoordinatorStreamsRecordHelpers { } - public static CoordinatorRecord newStreamsTargetAssignmentEpochRecord( + public static CoordinatorRecord newStreamsGroupTargetAssignmentEpochRecord( String groupId, int assignmentEpoch ) { @@ -298,7 +298,7 @@ public class CoordinatorStreamsRecordHelpers { * @param groupId The streams group id. * @return The record. */ - public static CoordinatorRecord newStreamsTargetAssignmentEpochTombstoneRecord( + public static CoordinatorRecord newStreamsGroupTargetAssignmentEpochTombstoneRecord( String groupId ) { return new CoordinatorRecord( @@ -311,7 +311,7 @@ public class CoordinatorStreamsRecordHelpers { ); } - public static CoordinatorRecord newStreamsCurrentAssignmentRecord( + public static CoordinatorRecord newStreamsGroupCurrentAssignmentRecord( String groupId, StreamsGroupMember member ) { @@ -343,7 +343,7 @@ public class CoordinatorStreamsRecordHelpers { * @param memberId The streams group member id. * @return The record. */ - public static CoordinatorRecord newStreamsCurrentAssignmentTombstoneRecord( + public static CoordinatorRecord newStreamsGroupCurrentAssignmentTombstoneRecord( String groupId, String memberId ) { @@ -397,7 +397,7 @@ public class CoordinatorStreamsRecordHelpers { return new StreamsGroupTopologyValue.TopicInfo().setName(topicInfo.name()).setTopicConfigs(topicConfigs); }).collect(Collectors.toList()); - value.topology().add(new StreamsGroupTopologyValue.Subtopology().setSubtopology(subtopology.subtopology()) + value.topology().add(new StreamsGroupTopologyValue.Subtopology().setSubtopology(subtopology.subtopologyId()) .setSourceTopics(subtopology.sourceTopics()).setRepartitionSinkTopics(subtopology.repartitionSinkTopics()) .setRepartitionSourceTopics(repartitionSourceTopics).setStateChangelogTopics(stateChangelogTopics)); }); diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java index 1d4c2ace5e1..4003b9a1125 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java @@ -29,8 +29,6 @@ import org.apache.kafka.coordinator.group.CoordinatorRecord; import org.apache.kafka.coordinator.group.Group; import org.apache.kafka.coordinator.group.OffsetExpirationCondition; import org.apache.kafka.coordinator.group.OffsetExpirationConditionImpl; -import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue; -import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue.Subtopology; import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetricsShard; import org.apache.kafka.image.ClusterImage; import org.apache.kafka.image.TopicImage; @@ -49,11 +47,11 @@ import java.util.Map; import java.util.Objects; import java.util.Optional; import java.util.Set; -import java.util.stream.Collectors; import static java.util.Collections.emptyMap; import static org.apache.kafka.coordinator.group.streams.StreamsGroup.StreamsGroupState.ASSIGNING; import static org.apache.kafka.coordinator.group.streams.StreamsGroup.StreamsGroupState.EMPTY; +import static org.apache.kafka.coordinator.group.streams.StreamsGroup.StreamsGroupState.INITIALIZING; import static org.apache.kafka.coordinator.group.streams.StreamsGroup.StreamsGroupState.RECONCILING; import static org.apache.kafka.coordinator.group.streams.StreamsGroup.StreamsGroupState.STABLE; @@ -64,6 +62,7 @@ public class StreamsGroup implements Group { public enum StreamsGroupState { EMPTY("Empty"), + INITIALIZING("Initializing"), ASSIGNING("Assigning"), RECONCILING("Reconciling"), STABLE("Stable"), @@ -189,7 +188,7 @@ public class StreamsGroup implements Group { ) { this.snapshotRegistry = Objects.requireNonNull(snapshotRegistry); this.groupId = Objects.requireNonNull(groupId); - this.state = new TimelineObject<>(snapshotRegistry, EMPTY); + this.state = new TimelineObject<>(snapshotRegistry, INITIALIZING); this.groupEpoch = new TimelineInteger(snapshotRegistry); this.members = new TimelineHashMap<>(snapshotRegistry, 0); this.staticMembers = new TimelineHashMap<>(snapshotRegistry, 0); @@ -246,6 +245,7 @@ public class StreamsGroup implements Group { public void setTopology(StreamsTopology topology) { this.topology.set(Optional.of(topology)); + maybeUpdateGroupState(); } /** @@ -648,9 +648,10 @@ public class StreamsGroup implements Group { */ public Map<String, TopicMetadata> computeSubscriptionMetadata( TopicsImage topicsImage, - ClusterImage clusterImage + ClusterImage clusterImage, + StreamsTopology topology ) { - Set<String> subscribedTopicNames = topology.get().map(StreamsTopology::topicSubscription).orElse(Collections.emptySet()); + Set<String> subscribedTopicNames = topology.topicSubscription(); // Create the topic metadata for each subscribed topic. Map<String, TopicMetadata> newSubscriptionMetadata = new HashMap<>(subscribedTopicNames.size()); @@ -685,6 +686,24 @@ public class StreamsGroup implements Group { return Collections.unmodifiableMap(newSubscriptionMetadata); } + /** + * Computes the subscription metadata based on the current subscription info. + * + * @param topicsImage The current metadata for all available topics. + * @param clusterImage The current metadata for the Kafka cluster. + * @return An immutable map of subscription metadata for each topic that the consumer group is subscribed to. + */ + public Map<String, TopicMetadata> computeSubscriptionMetadata( + TopicsImage topicsImage, + ClusterImage clusterImage + ) { + Optional<StreamsTopology> topology = this.topology.get(); + if (topology.isPresent()) { + return computeSubscriptionMetadata(topicsImage, clusterImage, topology.get()); + } + return Collections.emptyMap(); + } + /** * Updates the metadata refresh deadline. * @@ -791,7 +810,7 @@ public class StreamsGroup implements Group { */ @Override public void validateDeleteGroup() throws ApiException { - if (state() != StreamsGroupState.EMPTY) { + if (state() != StreamsGroupState.EMPTY && state() != StreamsGroupState.INITIALIZING) { throw Errors.NON_EMPTY_GROUP.exception(); } } @@ -809,13 +828,13 @@ public class StreamsGroup implements Group { @Override public void createGroupTombstoneRecords(List<CoordinatorRecord> records) { members().forEach((memberId, member) -> - records.add(CoordinatorStreamsRecordHelpers.newStreamsCurrentAssignmentTombstoneRecord(groupId(), memberId)) + records.add(CoordinatorStreamsRecordHelpers.newStreamsGroupCurrentAssignmentTombstoneRecord(groupId(), memberId)) ); members().forEach((memberId, member) -> - records.add(CoordinatorStreamsRecordHelpers.newStreamsTargetAssignmentTombstoneRecord(groupId(), memberId)) + records.add(CoordinatorStreamsRecordHelpers.newStreamsGroupTargetAssignmentTombstoneRecord(groupId(), memberId)) ); - records.add(CoordinatorStreamsRecordHelpers.newStreamsTargetAssignmentEpochTombstoneRecord(groupId())); + records.add(CoordinatorStreamsRecordHelpers.newStreamsGroupTargetAssignmentEpochTombstoneRecord(groupId())); members().forEach((memberId, member) -> records.add(CoordinatorStreamsRecordHelpers.newStreamsGroupMemberTombstoneRecord(groupId(), memberId)) @@ -865,7 +884,9 @@ public class StreamsGroup implements Group { private void maybeUpdateGroupState() { StreamsGroupState previousState = state.get(); StreamsGroupState newState = STABLE; - if (members.isEmpty()) { + if (topology() == null) { + newState = INITIALIZING; + } else if (members.isEmpty()) { newState = EMPTY; } else if (groupEpoch.get() > targetAssignmentEpoch.get()) { newState = ASSIGNING; @@ -1058,7 +1079,7 @@ public class StreamsGroup implements Group { records.add(CoordinatorStreamsRecordHelpers.newStreamsGroupEpochRecord(groupId(), groupEpoch())); members().forEach((streamsGroupMemberId, streamsGroupMember) -> - records.add(CoordinatorStreamsRecordHelpers.newStreamsTargetAssignmentRecord( + records.add(CoordinatorStreamsRecordHelpers.newStreamsGroupTargetAssignmentRecord( groupId(), streamsGroupMemberId, targetAssignment(streamsGroupMemberId).activeTasks(), @@ -1067,11 +1088,11 @@ public class StreamsGroup implements Group { )) ); - records.add(CoordinatorStreamsRecordHelpers.newStreamsTargetAssignmentEpochRecord(groupId(), groupEpoch())); + records.add(CoordinatorStreamsRecordHelpers.newStreamsGroupTargetAssignmentEpochRecord(groupId(), groupEpoch())); records.add(CoordinatorStreamsRecordHelpers.newStreamsGroupPartitionMetadataRecord(groupId(), subscriptionMetadata())); members().forEach((__, streamsGroupMember) -> - records.add(CoordinatorStreamsRecordHelpers.newStreamsCurrentAssignmentRecord(groupId(), streamsGroupMember)) + records.add(CoordinatorStreamsRecordHelpers.newStreamsGroupCurrentAssignmentRecord(groupId(), streamsGroupMember)) ); } @@ -1115,9 +1136,4 @@ public class StreamsGroup implements Group { } return false; } - - public void setTopology(final StreamsGroupTopologyValue topology) { - this.topology.set(Optional.of(new StreamsTopology(topology.topologyId(), topology.topology().stream().collect(Collectors.toMap( - Subtopology::subtopology, x -> x))))); - } } diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsTopology.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsTopology.java index 5f09941fae9..ca2fd9a6da0 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsTopology.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsTopology.java @@ -16,8 +16,10 @@ */ package org.apache.kafka.coordinator.group.streams; +import java.util.HashMap; import java.util.List; import org.apache.kafka.common.message.StreamsGroupDescribeResponseData; +import org.apache.kafka.common.message.StreamsGroupInitializeRequestData; import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue; import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue.Subtopology; import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue.TopicInfo; @@ -37,11 +39,41 @@ public class StreamsTopology { private final Map<String, Subtopology> subtopologies; - public StreamsTopology(final String topologyId, final Map<String, Subtopology> subtopologies) { + public StreamsTopology(final String topologyId, + final Map<String, Subtopology> subtopologies) { this.topologyId = topologyId; this.subtopologies = subtopologies; } + public StreamsTopology(final String topologyId, + final List<StreamsGroupInitializeRequestData.Subtopology> subtopologies) { + this.topologyId = topologyId; + this.subtopologies = new HashMap<>(); + subtopologies.forEach(subtopology -> { + List<StreamsGroupTopologyValue.TopicInfo> repartitionSourceTopics = subtopology.repartitionSourceTopics().stream() + .map(topicInfo -> { + List<StreamsGroupTopologyValue.TopicConfig> topicConfigs = topicInfo.topicConfigs() != null ? topicInfo.topicConfigs().stream() + .map(config -> new StreamsGroupTopologyValue.TopicConfig().setKey(config.key()).setValue(config.value())) + .collect(Collectors.toList()) : null; + return new StreamsGroupTopologyValue.TopicInfo().setName(topicInfo.name()).setTopicConfigs(topicConfigs) + .setPartitions(topicInfo.partitions()); + }).collect(Collectors.toList()); + + List<StreamsGroupTopologyValue.TopicInfo> stateChangelogTopics = subtopology.stateChangelogTopics().stream().map(topicInfo -> { + List<StreamsGroupTopologyValue.TopicConfig> topicConfigs = topicInfo.topicConfigs() != null ? topicInfo.topicConfigs().stream() + .map(config -> new StreamsGroupTopologyValue.TopicConfig().setKey(config.key()).setValue(config.value())) + .collect(Collectors.toList()) : null; + return new StreamsGroupTopologyValue.TopicInfo().setName(topicInfo.name()).setTopicConfigs(topicConfigs); + }).collect(Collectors.toList()); + + this.subtopologies.put( + subtopology.subtopologyId(), + new StreamsGroupTopologyValue.Subtopology().setSubtopology(subtopology.subtopologyId()) + .setSourceTopics(subtopology.sourceTopics()).setRepartitionSinkTopics(subtopology.repartitionSinkTopics()) + .setRepartitionSourceTopics(repartitionSourceTopics).setStateChangelogTopics(stateChangelogTopics)); + }); + } + public String topologyId() { return topologyId; } @@ -57,9 +89,7 @@ public class StreamsTopology { Collectors.toSet()); } - public static StreamsTopology fromRecord( - StreamsGroupTopologyValue record - ) { + public static StreamsTopology fromRecord(StreamsGroupTopologyValue record) { return new StreamsTopology( record.topologyId(), record.topology().stream().collect(Collectors.toMap(Subtopology::subtopology, x -> x)) diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/TargetAssignmentBuilder.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/TargetAssignmentBuilder.java index a09919e2157..d4cb0b85275 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/TargetAssignmentBuilder.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/TargetAssignmentBuilder.java @@ -306,7 +306,7 @@ public class TargetAssignmentBuilder { if (oldMemberAssignment == null) { // If the member had no assignment, we always create a record for it. - records.add(CoordinatorStreamsRecordHelpers.newStreamsTargetAssignmentRecord( + records.add(CoordinatorStreamsRecordHelpers.newStreamsGroupTargetAssignmentRecord( groupId, memberId, newMemberAssignment.activeTasks(), @@ -317,7 +317,7 @@ public class TargetAssignmentBuilder { // If the member had an assignment, we only create a record if the // new assignment is different. if (!newMemberAssignment.equals(oldMemberAssignment)) { - records.add(CoordinatorStreamsRecordHelpers.newStreamsTargetAssignmentRecord( + records.add(CoordinatorStreamsRecordHelpers.newStreamsGroupTargetAssignmentRecord( groupId, memberId, newMemberAssignment.activeTasks(), @@ -329,7 +329,7 @@ public class TargetAssignmentBuilder { }); // Bump the target assignment epoch. - records.add(CoordinatorStreamsRecordHelpers.newStreamsTargetAssignmentEpochRecord(groupId, groupEpoch)); + records.add(CoordinatorStreamsRecordHelpers.newStreamsGroupTargetAssignmentEpochRecord(groupId, groupEpoch)); return new TargetAssignmentResult(records, newTargetAssignment); } diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java index 263b2ab76ea..03966c07ea8 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java @@ -1009,16 +1009,16 @@ public class GroupCoordinatorShardTest { // Confirm the cleanup is scheduled when the coordinator is initially loaded. coordinator.onLoaded(image); - assertTrue(timer.contains(GROUP_EXPIRATION_KEY)); + assertTrue(timer.isScheduled(GROUP_EXPIRATION_KEY)); // Confirm that it is rescheduled after completion. mockTime.sleep(1000L); List<MockCoordinatorTimer.ExpiredTimeout<Void, CoordinatorRecord>> tasks = timer.poll(); assertEquals(1, tasks.size()); - assertTrue(timer.contains(GROUP_EXPIRATION_KEY)); + assertTrue(timer.isScheduled(GROUP_EXPIRATION_KEY)); coordinator.onUnloaded(); - assertFalse(timer.contains(GROUP_EXPIRATION_KEY)); + assertFalse(timer.isScheduled(GROUP_EXPIRATION_KEY)); } @Test @@ -1058,9 +1058,9 @@ public class GroupCoordinatorShardTest { return null; }).when(groupMetadataManager).maybeDeleteGroup(eq("group-id"), recordsCapture.capture()); - assertFalse(timer.contains(GROUP_EXPIRATION_KEY)); + assertFalse(timer.isScheduled(GROUP_EXPIRATION_KEY)); CoordinatorResult<Void, CoordinatorRecord> result = coordinator.cleanupGroupMetadata(); - assertTrue(timer.contains(GROUP_EXPIRATION_KEY)); + assertTrue(timer.isScheduled(GROUP_EXPIRATION_KEY)); List<CoordinatorRecord> expectedRecords = Arrays.asList(offsetCommitTombstone, groupMetadataTombstone); assertEquals(expectedRecords, result.records()); diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java index 05a94ea49a6..58d3a4e5ecc 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java @@ -56,6 +56,9 @@ import org.apache.kafka.common.message.ShareGroupHeartbeatRequestData; import org.apache.kafka.common.message.ShareGroupHeartbeatResponseData; import org.apache.kafka.common.message.StreamsGroupDescribeResponseData; import org.apache.kafka.common.message.StreamsGroupHeartbeatRequestData; +import org.apache.kafka.common.message.StreamsGroupHeartbeatResponseData; +import org.apache.kafka.common.message.StreamsGroupInitializeRequestData; +import org.apache.kafka.common.message.StreamsGroupInitializeResponseData; import org.apache.kafka.common.message.SyncGroupRequestData; import org.apache.kafka.common.message.SyncGroupRequestData.SyncGroupRequestAssignment; import org.apache.kafka.common.message.SyncGroupResponseData; @@ -95,6 +98,7 @@ import org.apache.kafka.coordinator.group.streams.CoordinatorStreamsRecordHelper import org.apache.kafka.coordinator.group.streams.StreamsGroup; import org.apache.kafka.coordinator.group.streams.StreamsGroupBuilder; import org.apache.kafka.coordinator.group.streams.StreamsGroupMember; +import org.apache.kafka.coordinator.group.streams.TaskAssignmentTestUtil; import org.apache.kafka.image.MetadataDelta; import org.apache.kafka.image.MetadataImage; import org.apache.kafka.image.MetadataProvenance; @@ -125,6 +129,8 @@ import static org.apache.kafka.common.protocol.Errors.NOT_COORDINATOR; import static org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH; import static org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest.LEAVE_GROUP_STATIC_MEMBER_EPOCH; import static org.apache.kafka.common.requests.JoinGroupRequest.UNKNOWN_MEMBER_ID; +import static org.apache.kafka.common.utils.Utils.mkEntry; +import static org.apache.kafka.common.utils.Utils.mkMap; import static org.apache.kafka.coordinator.group.Assertions.assertRecordEquals; import static org.apache.kafka.coordinator.group.Assertions.assertRecordsEquals; import static org.apache.kafka.coordinator.group.Assertions.assertResponseEquals; @@ -410,6 +416,440 @@ public class GroupMetadataManagerTest { assertEquals(2, result.response().memberEpoch()); } + @Test + public void testJoiningNonExistingStreamsGroup() { + String groupId = "group-id"; + int rebalanceTimeoutMs = 300000; + String topologyId = "topology-id"; + String processId = "process-id"; + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .build(); + StreamsGroupHeartbeatRequestData heartbeat = buildFirstStreamsGroupHeartbeatRequest(groupId, topologyId, processId, rebalanceTimeoutMs); + + CoordinatorResult<StreamsGroupHeartbeatResponseData, CoordinatorRecord> result = context.streamsGroupHeartbeat(heartbeat); + + assertNotNull(result.response()); + StreamsGroupHeartbeatResponseData response = result.response(); + assertEquals(Errors.NONE.code(), response.errorCode()); + assertFalse(response.memberId().isEmpty()); + assertEquals(1, response.memberEpoch()); + assertTrue(response.shouldInitializeTopology()); + assertTrue(response.activeTasks().isEmpty()); + assertTrue(response.standbyTasks().isEmpty()); + assertTrue(response.warmupTasks().isEmpty()); + List<CoordinatorRecord> coordinatorRecords = result.records(); + assertEquals(5, coordinatorRecords.size()); + assertTrue(coordinatorRecords.contains(CoordinatorStreamsRecordHelpers.newStreamsGroupEpochRecord(groupId, 1))); + StreamsGroupMember member = new StreamsGroupMember.Builder(response.memberId()) + .setClientId("client") + .setClientHost("localhost/127.0.0.1") + .setRebalanceTimeoutMs(rebalanceTimeoutMs) + .setTopologyId(topologyId) + .setProcessId(processId) + .build(); + assertTrue(coordinatorRecords.contains(CoordinatorStreamsRecordHelpers.newStreamsGroupMemberRecord(groupId, member))); + assertTrue(coordinatorRecords.contains(CoordinatorStreamsRecordHelpers.newStreamsGroupTargetAssignmentEpochRecord(groupId, 1))); + assertTrue(coordinatorRecords.contains( + CoordinatorStreamsRecordHelpers.newStreamsGroupTargetAssignmentRecord( + groupId, + member.memberId(), + Collections.emptyMap(), + Collections.emptyMap(), + Collections.emptyMap() + ) + )); + StreamsGroupMember updatedMember = new org.apache.kafka.coordinator.group.streams.CurrentAssignmentBuilder(member) + .withTargetAssignment( + 1, + new org.apache.kafka.coordinator.group.streams.Assignment(Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap()) + ) + .withCurrentActiveTaskEpoch((a, b) -> 1) + .withOwnedActiveTasks(Collections.emptyList()) + .withOwnedStandbyTasks(Collections.emptyList()) + .withOwnedWarmupTasks(Collections.emptyList()) + .build(); + CoordinatorStreamsRecordHelpers.newStreamsGroupCurrentAssignmentRecord(groupId, updatedMember); + assertEquals(StreamsGroup.StreamsGroupState.INITIALIZING, context.streamsGroupState("group-id")); + assertTrue(context.timer.isScheduled("topology-initialization-timeout-group-id-topology-id")); + assertEquals(rebalanceTimeoutMs, context.timer.timeout("topology-initialization-timeout-group-id-topology-id").deadlineMs - context.time.milliseconds()); + } + + @Test + public void testJoiningExistingInitializingStreamsGroup() { + String groupId = "group-id"; + int rebalanceTimeoutMs = 300000; + String topologyId = "topology-id"; + String processId = "process-id"; + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .build(); + StreamsGroupHeartbeatRequestData heartbeatToCreateAndRequestInitGroup = + buildFirstStreamsGroupHeartbeatRequest(groupId, topologyId, processId, rebalanceTimeoutMs); + context.streamsGroupHeartbeat(heartbeatToCreateAndRequestInitGroup); + StreamsGroupHeartbeatRequestData heartbeat = + buildFirstStreamsGroupHeartbeatRequest(groupId, topologyId, processId, rebalanceTimeoutMs); + + CoordinatorResult<StreamsGroupHeartbeatResponseData, CoordinatorRecord> result = context.streamsGroupHeartbeat(heartbeat); + + assertNotNull(result.response()); + StreamsGroupHeartbeatResponseData response = result.response(); + assertEquals(Errors.NONE.code(), response.errorCode()); + assertFalse(response.memberId().isEmpty()); + assertEquals(2, response.memberEpoch()); + assertFalse(response.shouldInitializeTopology()); + assertTrue(response.activeTasks().isEmpty()); + assertTrue(response.standbyTasks().isEmpty()); + assertTrue(response.warmupTasks().isEmpty()); + List<CoordinatorRecord> coordinatorRecords = result.records(); + assertEquals(5, coordinatorRecords.size()); + assertTrue(coordinatorRecords.contains(CoordinatorStreamsRecordHelpers.newStreamsGroupEpochRecord(groupId, 2))); + StreamsGroupMember member = new StreamsGroupMember.Builder(response.memberId()) + .setClientId("client") + .setClientHost("localhost/127.0.0.1") + .setRebalanceTimeoutMs(rebalanceTimeoutMs) + .setTopologyId(topologyId) + .setProcessId(processId) + .build(); + assertTrue(coordinatorRecords.contains(CoordinatorStreamsRecordHelpers.newStreamsGroupMemberRecord(groupId, member))); + assertTrue(coordinatorRecords.contains(CoordinatorStreamsRecordHelpers.newStreamsGroupTargetAssignmentEpochRecord(groupId, 2))); + assertTrue(coordinatorRecords.contains( + CoordinatorStreamsRecordHelpers.newStreamsGroupTargetAssignmentRecord( + groupId, + member.memberId(), + Collections.emptyMap(), + Collections.emptyMap(), + Collections.emptyMap() + ) + )); + StreamsGroupMember updatedMember = new org.apache.kafka.coordinator.group.streams.CurrentAssignmentBuilder(member) + .withTargetAssignment( + 1, + new org.apache.kafka.coordinator.group.streams.Assignment(Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap()) + ) + .withCurrentActiveTaskEpoch((a, b) -> 1) + .withOwnedActiveTasks(Collections.emptyList()) + .withOwnedStandbyTasks(Collections.emptyList()) + .withOwnedWarmupTasks(Collections.emptyList()) + .build(); + CoordinatorStreamsRecordHelpers.newStreamsGroupCurrentAssignmentRecord(groupId, updatedMember); + assertEquals(StreamsGroup.StreamsGroupState.INITIALIZING, context.streamsGroupState("group-id")); + } + + @Test + public void testInitTopologyExistingInitializingStreamsGroup() { + String groupId = "group-id"; + int rebalanceTimeoutMs = 300000; + String topologyId = "topology-id"; + String processId = "process-id"; + String inputTopicName = "input-topic"; + String subtopologyId = "subtopology-id"; + Uuid inputTopicId = Uuid.randomUuid(); + MetadataImage metadataImage = new MetadataImageBuilder() + .addTopic(inputTopicId, inputTopicName, 3) + .addRacks() + .build(); + MockTaskAssignor assignor = new MockTaskAssignor("mock"); + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withMetadataImage(metadataImage) + .withTaskAssignors(Collections.singletonList(assignor)) + .build(); + StreamsGroupHeartbeatRequestData heartbeatToCreateAndRequestInitGroup = + buildFirstStreamsGroupHeartbeatRequest(groupId, topologyId, processId, rebalanceTimeoutMs); + final CoordinatorResult<StreamsGroupHeartbeatResponseData, CoordinatorRecord> heartbeatResult = + context.streamsGroupHeartbeat(heartbeatToCreateAndRequestInitGroup); + final List<StreamsGroupInitializeRequestData.Subtopology> subtopologies = new ArrayList<>(); + subtopologies.add(new StreamsGroupInitializeRequestData.Subtopology() + .setSubtopologyId(subtopologyId) + .setRepartitionSinkTopics(Collections.emptyList()) + .setRepartitionSourceTopics(Collections.emptyList()) + .setSourceTopics(Collections.singletonList("input-topic")) + ); + prepareStreamsGroupAssignment(assignor, heartbeatResult.response().memberId(), subtopologyId); + StreamsGroupInitializeRequestData initialize = new StreamsGroupInitializeRequestData() + .setGroupId(groupId) + .setTopologyId(topologyId) + .setTopology(subtopologies); + + CoordinatorResult<StreamsGroupInitializeResponseData, CoordinatorRecord> result = context.streamsGroupInitialize(initialize); + + assertNotNull(result.response()); + StreamsGroupInitializeResponseData response = result.response(); + assertEquals(Errors.NONE.code(), response.errorCode()); + List<CoordinatorRecord> coordinatorRecords = result.records(); + assertEquals(5, coordinatorRecords.size()); + assertTrue(coordinatorRecords.contains(CoordinatorStreamsRecordHelpers.newStreamsGroupEpochRecord(groupId, 2))); + assertTrue(coordinatorRecords.contains(CoordinatorStreamsRecordHelpers.newStreamsGroupTopologyRecord(groupId, subtopologies))); + org.apache.kafka.coordinator.group.streams.TopicMetadata topicMetadata = new org.apache.kafka.coordinator.group.streams.TopicMetadata( + inputTopicId, + inputTopicName, + 3, + mkMap( + mkEntry(0, mkSet("rack0", "rack1")), + mkEntry(1, mkSet("rack1", "rack2")), + mkEntry(2, mkSet("rack2", "rack3")) + ) + ); + assertTrue(coordinatorRecords.contains( + CoordinatorStreamsRecordHelpers.newStreamsGroupPartitionMetadataRecord(groupId, mkMap(mkEntry(inputTopicName, topicMetadata))) + )); + assertTrue(coordinatorRecords.contains(CoordinatorStreamsRecordHelpers.newStreamsGroupTargetAssignmentEpochRecord(groupId, 2))); + assertTrue(coordinatorRecords.contains( + CoordinatorStreamsRecordHelpers.newStreamsGroupTargetAssignmentRecord( + groupId, + heartbeatResult.response().memberId(), + TaskAssignmentTestUtil.mkAssignment(TaskAssignmentTestUtil.mkTaskAssignment(subtopologyId, 0, 1, 2)), + Collections.emptyMap(), + Collections.emptyMap() + ) + )); + assertFalse(context.timer.isScheduled("topology-initialization-timeout-group-id-topology-id")); + assertEquals(StreamsGroup.StreamsGroupState.RECONCILING, context.streamsGroupState("group-id")); + } + + @Test + public void testInitTopologyWithWrongGroupId() { + String groupId = "group-id"; + int rebalanceTimeoutMs = 300000; + String topologyId = "topology-id"; + String processId = "process-id"; + String inputTopicName = "input-topic"; + String subtopologyId = "subtopology-id"; + Uuid inputTopicId = Uuid.randomUuid(); + MetadataImage metadataImage = new MetadataImageBuilder() + .addTopic(inputTopicId, inputTopicName, 3) + .addRacks() + .build(); + MockTaskAssignor assignor = new MockTaskAssignor("mock"); + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withMetadataImage(metadataImage) + .withTaskAssignors(Collections.singletonList(assignor)) + .build(); + StreamsGroupHeartbeatRequestData heartbeatToCreateAndRequestInitGroup = + buildFirstStreamsGroupHeartbeatRequest(groupId, topologyId, processId, rebalanceTimeoutMs); + final CoordinatorResult<StreamsGroupHeartbeatResponseData, CoordinatorRecord> heartbeatResult = + context.streamsGroupHeartbeat(heartbeatToCreateAndRequestInitGroup); + final List<StreamsGroupInitializeRequestData.Subtopology> subtopologies = new ArrayList<>(); + subtopologies.add(new StreamsGroupInitializeRequestData.Subtopology() + .setSubtopologyId(subtopologyId) + .setRepartitionSinkTopics(Collections.emptyList()) + .setRepartitionSourceTopics(Collections.emptyList()) + .setSourceTopics(Collections.singletonList("input-topic")) + ); + prepareStreamsGroupAssignment(assignor, heartbeatResult.response().memberId(), subtopologyId); + final String wrongGroupId = "wrong-" + groupId; + StreamsGroupInitializeRequestData initialize = new StreamsGroupInitializeRequestData() + .setGroupId(wrongGroupId) + .setTopologyId(topologyId) + .setTopology(subtopologies); + + final GroupIdNotFoundException groupIdNotFoundException = + assertThrows(GroupIdNotFoundException.class, () -> context.streamsGroupInitialize(initialize)); + + assertEquals( + "Streams group " + wrongGroupId + " not found.", + groupIdNotFoundException.getMessage() + ); + } + + @Test + public void testInitTopologyWithWrongTopologyId() { + String groupId = "group-id"; + int rebalanceTimeoutMs = 300000; + String topologyId = "topology-id"; + String processId = "process-id"; + String inputTopicName = "input-topic"; + String subtopologyId = "subtopology-id"; + Uuid inputTopicId = Uuid.randomUuid(); + MetadataImage metadataImage = new MetadataImageBuilder() + .addTopic(inputTopicId, inputTopicName, 3) + .addRacks() + .build(); + MockTaskAssignor assignor = new MockTaskAssignor("mock"); + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withMetadataImage(metadataImage) + .withTaskAssignors(Collections.singletonList(assignor)) + .build(); + StreamsGroupHeartbeatRequestData heartbeatToCreateAndRequestInitGroup = + buildFirstStreamsGroupHeartbeatRequest(groupId, topologyId, processId, rebalanceTimeoutMs); + final CoordinatorResult<StreamsGroupHeartbeatResponseData, CoordinatorRecord> heartbeatResult = + context.streamsGroupHeartbeat(heartbeatToCreateAndRequestInitGroup); + final List<StreamsGroupInitializeRequestData.Subtopology> subtopologies = new ArrayList<>(); + subtopologies.add(new StreamsGroupInitializeRequestData.Subtopology() + .setSubtopologyId(subtopologyId) + .setRepartitionSinkTopics(Collections.emptyList()) + .setRepartitionSourceTopics(Collections.emptyList()) + .setSourceTopics(Collections.singletonList("input-topic")) + ); + prepareStreamsGroupAssignment(assignor, heartbeatResult.response().memberId(), subtopologyId); + final String wrongTopologyId = "wrong-" + topologyId; + StreamsGroupInitializeRequestData initialize = new StreamsGroupInitializeRequestData() + .setGroupId(groupId) + .setTopologyId(wrongTopologyId) + .setTopology(subtopologies); + + CoordinatorResult<StreamsGroupInitializeResponseData, CoordinatorRecord> result = context.streamsGroupInitialize(initialize); + + assertNotNull(result.response()); + StreamsGroupInitializeResponseData response = result.response(); + assertEquals(Errors.NONE.code(), response.errorCode()); + assertTrue(result.records().isEmpty()); + } + + @Test + public void testInitTopologyForInitializedStreamsGroup() { + String groupId = "group-id"; + int rebalanceTimeoutMs = 300000; + String topologyId = "topology-id"; + String processId = "process-id"; + String inputTopicName = "input-topic"; + String subtopologyId = "subtopology-id"; + Uuid inputTopicId = Uuid.randomUuid(); + MetadataImage metadataImage = new MetadataImageBuilder() + .addTopic(inputTopicId, inputTopicName, 3) + .addRacks() + .build(); + MockTaskAssignor assignor = new MockTaskAssignor("mock"); + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withMetadataImage(metadataImage) + .withTaskAssignors(Collections.singletonList(assignor)) + .build(); + StreamsGroupHeartbeatRequestData heartbeatToCreateAndRequestInitGroup = + buildFirstStreamsGroupHeartbeatRequest(groupId, topologyId, processId, rebalanceTimeoutMs); + final CoordinatorResult<StreamsGroupHeartbeatResponseData, CoordinatorRecord> heartbeatResult = + context.streamsGroupHeartbeat(heartbeatToCreateAndRequestInitGroup); + final List<StreamsGroupInitializeRequestData.Subtopology> subtopologies = new ArrayList<>(); + subtopologies.add(new StreamsGroupInitializeRequestData.Subtopology() + .setSubtopologyId(subtopologyId) + .setRepartitionSinkTopics(Collections.emptyList()) + .setRepartitionSourceTopics(Collections.emptyList()) + .setSourceTopics(Collections.singletonList("input-topic")) + ); + prepareStreamsGroupAssignment(assignor, heartbeatResult.response().memberId(), subtopologyId); + StreamsGroupInitializeRequestData initialize = new StreamsGroupInitializeRequestData() + .setGroupId(groupId) + .setTopologyId(topologyId) + .setTopology(subtopologies); + + context.streamsGroupInitialize(initialize); + CoordinatorResult<StreamsGroupInitializeResponseData, CoordinatorRecord> result = context.streamsGroupInitialize(initialize); + + assertNotNull(result.response()); + StreamsGroupInitializeResponseData response = result.response(); + assertEquals(Errors.NONE.code(), response.errorCode()); + assertTrue(result.records().isEmpty()); + } + + @Test + public void testTopologyInitializationMissingInternalTopics() { + String groupId = "group-id"; + int rebalanceTimeoutMs = 300000; + String topologyId = "topology-id"; + String processId = "process-id"; + Uuid fooTopicId = Uuid.randomUuid(); + String fooTopicName = "repartition"; + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withMetadataImage(new MetadataImageBuilder() + .addTopic(fooTopicId, fooTopicName, 6) + .addRacks() + .build()) + .build(); + StreamsGroupHeartbeatRequestData heartbeatToCreateAndRequestInitGroup = + buildFirstStreamsGroupHeartbeatRequest(groupId, topologyId, processId, rebalanceTimeoutMs); + context.streamsGroupHeartbeat(heartbeatToCreateAndRequestInitGroup); + assertThrows(GroupIdNotFoundException.class, () -> + context.groupMetadataManager.consumerGroup(groupId)); + final List<StreamsGroupInitializeRequestData.Subtopology> topology = Collections.singletonList( + new StreamsGroupInitializeRequestData.Subtopology() + .setSubtopologyId("subtopology-id") + .setSourceTopics(Collections.singletonList("bar")) + .setRepartitionSourceTopics( + Collections.singletonList( + new StreamsGroupInitializeRequestData.TopicInfo() + .setName("repartition") + .setPartitions(4) + .setTopicConfigs(Collections.singletonList( + new StreamsGroupInitializeRequestData.TopicConfig() + .setKey("config-name1") + .setValue("config-value1") + )) + ) + ) + .setStateChangelogTopics( + Collections.singletonList( + new StreamsGroupInitializeRequestData.TopicInfo() + .setName("changelog") + .setTopicConfigs(Collections.singletonList( + new StreamsGroupInitializeRequestData.TopicConfig() + .setKey("config-name2") + .setValue("config-value2") + )) + ) + ) + ); + CoordinatorResult<StreamsGroupInitializeResponseData, CoordinatorRecord> result = + context.streamsGroupInitialize( + new StreamsGroupInitializeRequestData() + .setGroupId(groupId) + .setTopologyId(topologyId) + .setTopology(topology) + ); + + assertEquals( + new StreamsGroupInitializeResponseData() + .setErrorCode(Errors.STREAMS_INVALID_TOPOLOGY.code()) + .setErrorMessage("Internal topics changelog do not exist."), + result.response() + ); + + assertTrue(result.records().isEmpty()); + } + + private StreamsGroupHeartbeatRequestData buildFirstStreamsGroupHeartbeatRequest( + final String groupId, + final String topologyId, + final String processId, + final int rebalanceTimeoutMs) { + + return new StreamsGroupHeartbeatRequestData() + .setGroupId(groupId) + .setMemberId("") + .setMemberEpoch(0) + .setInstanceId(null) + .setRackId(null) + .setRebalanceTimeoutMs(rebalanceTimeoutMs) + .setTopologyId(topologyId) + .setActiveTasks(Collections.emptyList()) + .setStandbyTasks(Collections.emptyList()) + .setWarmupTasks(Collections.emptyList()) + .setProcessId(processId) + .setUserEndpoint(null) + .setClientTags(null) + .setTaskOffsets(null) + .setTaskEndOffsets(null) + .setShutdownApplication(false); + } + + private void prepareStreamsGroupAssignment(final MockTaskAssignor assignor, + final String memberId, + final String subtopologyId) { + assignor.prepareGroupAssignment(new org.apache.kafka.coordinator.group.taskassignor.GroupAssignment( + mkMap( + mkEntry( + memberId, + new org.apache.kafka.coordinator.group.taskassignor.MemberAssignment( + mkMap( + mkEntry( + subtopologyId, + mkSet(0, 1, 2) + ) + ), + Collections.emptyMap(), + Collections.emptyMap() + ) + ) + ) + )); + } + @Test public void testMemberIdGeneration() { MockPartitionAssignor assignor = new MockPartitionAssignor("range"); @@ -8865,7 +9305,7 @@ public class GroupMetadataManagerTest { new StreamsGroupDescribeResponseData.DescribedGroup() .setGroupEpoch(epoch) .setGroupId(streamsGroupIds.get(0)) - .setGroupState(StreamsGroup.StreamsGroupState.EMPTY.toString()) + .setGroupState(StreamsGroup.StreamsGroupState.INITIALIZING.toString()) .setAssignmentEpoch(0), new StreamsGroupDescribeResponseData.DescribedGroup() .setGroupEpoch(epoch) @@ -8875,7 +9315,7 @@ public class GroupMetadataManagerTest { new org.apache.kafka.coordinator.group.streams.Assignment(Collections.emptyMap()) ) )) - .setGroupState(StreamsGroup.StreamsGroupState.ASSIGNING.toString()) + .setGroupState(StreamsGroup.StreamsGroupState.INITIALIZING.toString()) ); List<StreamsGroupDescribeResponseData.DescribedGroup> actual = context.sendStreamsGroupDescribe(streamsGroupIds); @@ -8924,8 +9364,8 @@ public class GroupMetadataManagerTest { StreamsGroupMember.Builder memberBuilder2 = new StreamsGroupMember.Builder(memberId2); context.replay(CoordinatorStreamsRecordHelpers.newStreamsGroupMemberRecord(streamsGroupId, memberBuilder2.build())); - context.replay(CoordinatorStreamsRecordHelpers.newStreamsTargetAssignmentRecord(streamsGroupId, memberId2, assignmentMap, assignmentMap, assignmentMap)); - context.replay(CoordinatorStreamsRecordHelpers.newStreamsCurrentAssignmentRecord(streamsGroupId, memberBuilder2.build())); + context.replay(CoordinatorStreamsRecordHelpers.newStreamsGroupTargetAssignmentRecord(streamsGroupId, memberId2, assignmentMap, assignmentMap, assignmentMap)); + context.replay(CoordinatorStreamsRecordHelpers.newStreamsGroupCurrentAssignmentRecord(streamsGroupId, memberBuilder2.build())); context.replay(CoordinatorStreamsRecordHelpers.newStreamsGroupEpochRecord(streamsGroupId, epoch + 2)); List<StreamsGroupDescribeResponseData.DescribedGroup> actual = context.groupMetadataManager.streamsGroupDescribe(Collections.singletonList(streamsGroupId), context.lastCommittedOffset); @@ -8945,7 +9385,7 @@ public class GroupMetadataManagerTest { memberBuilder1.build().asStreamsGroupDescribeMember(new org.apache.kafka.coordinator.group.streams.Assignment(Collections.emptyMap())), memberBuilder2.build().asStreamsGroupDescribeMember(new org.apache.kafka.coordinator.group.streams.Assignment(assignmentMap, assignmentMap, assignmentMap)) )) - .setGroupState(StreamsGroup.StreamsGroupState.ASSIGNING.toString()) + .setGroupState(StreamsGroup.StreamsGroupState.INITIALIZING.toString()) .setGroupEpoch(epoch + 2); assertEquals(1, actual.size()); assertEquals(describedGroup, actual.get(0)); @@ -15963,144 +16403,4 @@ public class GroupMetadataManagerTest { assertEquals(expectedSuccessCount, successCount); return memberIds; } - - // TODO: bring back topology initialization unit tests -// @Test -// public void testTopologyInitialization() { -// String groupId = "fooup"; -// -// Uuid fooTopicId = Uuid.randomUuid(); -// String fooTopicName = "repartition"; -// Uuid barTopicId = Uuid.randomUuid(); -// String barTopicName = "changelog"; -// -// GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() -// .withMetadataImage(new MetadataImageBuilder() -// .addTopic(fooTopicId, fooTopicName, 6) -// .addTopic(barTopicId, barTopicName, 3) -// .addRacks() -// .build()) -// .build(); -// -// context.createStreamsGroup(groupId); -// -// assertThrows(GroupIdNotFoundException.class, () -> -// context.groupMetadataManager.consumerGroup(groupId)); -// -// final List<Subtopology> topology = Collections.singletonList( -// new Subtopology() -// .setSubtopology("subtopology-id") -// .setSinkTopics(Collections.singletonList("foo")) -// .setSourceTopics(Collections.singletonList("bar")) -// .setRepartitionSourceTopics( -// Collections.singletonList( -// new TopicInfo() -// .setName("repartition") -// .setPartitions(4) -// .setTopicConfigs(Collections.singletonList( -// new TopicConfig() -// .setKey("config-name1") -// .setValue("config-value1") -// )) -// ) -// ) -// .setStateChangelogTopics( -// Collections.singletonList( -// new TopicInfo() -// .setName("changelog") -// .setTopicConfigs(Collections.singletonList( -// new TopicConfig() -// .setKey("config-name2") -// .setValue("config-value2") -// )) -// ) -// ) -// ); -// CoordinatorResult<StreamsGroupInitializeResponseData, CoordinatorRecord> result = -// context.streamsGroupInitialize( -// new streamsGroupInitializeRequestData() -// .setGroupId(groupId) -// .setTopology(topology) -// ); -// -// assertEquals( -// new StreamsGroupInitializeResponseData(), -// result.response() -// ); -// -// List<CoordinatorRecord> expectedRecords = Arrays.asList( -// CoordinatorRecordHelpers.newStreamsGroupTopologyRecord( -// groupId, -// topology -// ) -// ); -// -// assertRecordsEquals(expectedRecords, result.records()); -// } -// -// @Test -// public void testTopologyInitializationMissingInternalTopics() { -// String groupId = "fooup"; -// -// Uuid fooTopicId = Uuid.randomUuid(); -// String fooTopicName = "repartition"; -// -// GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() -// .withMetadataImage(new MetadataImageBuilder() -// .addTopic(fooTopicId, fooTopicName, 6) -// .addRacks() -// .build()) -// .build(); -// -// context.createStreamsGroup(groupId); -// -// assertThrows(GroupIdNotFoundException.class, () -> -// context.groupMetadataManager.consumerGroup(groupId)); -// -// final List<Subtopology> topology = Collections.singletonList( -// new Subtopology() -// .setSubtopology("subtopology-id") -// .setSinkTopics(Collections.singletonList("foo")) -// .setSourceTopics(Collections.singletonList("bar")) -// .setRepartitionSourceTopics( -// Collections.singletonList( -// new TopicInfo() -// .setName("repartition") -// .setPartitions(4) -// .setTopicConfigs(Collections.singletonList( -// new TopicConfig() -// .setKey("config-name1") -// .setValue("config-value1") -// )) -// ) -// ) -// .setStateChangelogTopics( -// Collections.singletonList( -// new TopicInfo() -// .setName("changelog") -// .setTopicConfigs(Collections.singletonList( -// new TopicConfig() -// .setKey("config-name2") -// .setValue("config-value2") -// )) -// ) -// ) -// ); -// CoordinatorResult<StreamsGroupInitializeResponseData, CoordinatorRecord> result = -// context.streamsGroupInitialize( -// new streamsGroupInitializeRequestData() -// .setGroupId(groupId) -// .setTopology(topology) -// ); -// -// assertEquals( -// new StreamsGroupInitializeResponseData() -// .setErrorCode(Errors.STREAMS_INVALID_TOPOLOGY.code()) -// .setErrorMessage("Internal topics changelog do not exist."), -// result.response() -// ); -// -// assertTrue(result.records().isEmpty()); -// -// } } diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java index d77f5414ee9..96fd18344c0 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java @@ -109,6 +109,7 @@ import org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroup; import org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroupBuilder; import org.apache.kafka.coordinator.group.modern.share.ShareGroup; import org.apache.kafka.coordinator.group.modern.share.ShareGroupBuilder; +import org.apache.kafka.coordinator.group.streams.StreamsGroup; import org.apache.kafka.coordinator.group.streams.StreamsGroupBuilder; import org.apache.kafka.coordinator.group.taskassignor.TaskAssignor; import org.apache.kafka.image.MetadataImage; @@ -651,6 +652,14 @@ public class GroupMetadataManagerTestContext { .state(); } + public StreamsGroup.StreamsGroupState streamsGroupState( + String groupId + ) { + return groupMetadataManager + .streamsGroup(groupId) + .state(); + } + public MemberState consumerGroupMemberState( String groupId, String memberId diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/CoordinatorStreamsRecordHelpersTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/CoordinatorStreamsRecordHelpersTest.java index cf1a003cd84..b9298dab3f3 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/CoordinatorStreamsRecordHelpersTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/CoordinatorStreamsRecordHelpersTest.java @@ -34,7 +34,7 @@ class CoordinatorStreamsRecordHelpersTest { public void testNewStreamsGroupTopologyRecord() { List<StreamsGroupInitializeRequestData.Subtopology> topology = Collections.singletonList(new StreamsGroupInitializeRequestData.Subtopology() - .setSubtopology("subtopology-id") + .setSubtopologyId("subtopology-id") .setRepartitionSinkTopics(Collections.singletonList("foo")) .setSourceTopics(Collections.singletonList("bar")) .setRepartitionSourceTopics( diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupBuilder.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupBuilder.java index b07dabc05e1..ed5c93d4b86 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupBuilder.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupBuilder.java @@ -82,18 +82,18 @@ public class StreamsGroupBuilder { // Add target assignment records. assignments.forEach((memberId, assignment) -> records.add( - CoordinatorStreamsRecordHelpers.newStreamsTargetAssignmentRecord(groupId, memberId, + CoordinatorStreamsRecordHelpers.newStreamsGroupTargetAssignmentRecord(groupId, memberId, assignment.activeTasks(), assignment.standbyTasks(), assignment.warmupTasks())) ); // Add target assignment epoch. - records.add(CoordinatorStreamsRecordHelpers.newStreamsTargetAssignmentEpochRecord(groupId, + records.add(CoordinatorStreamsRecordHelpers.newStreamsGroupTargetAssignmentEpochRecord(groupId, assignmentEpoch)); // Add current assignment records for members. members.forEach((memberId, member) -> records.add( - CoordinatorStreamsRecordHelpers.newStreamsCurrentAssignmentRecord(groupId, member)) + CoordinatorStreamsRecordHelpers.newStreamsGroupCurrentAssignmentRecord(groupId, member)) ); return records; diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java index 2fd5d461c99..8d2a0bfa418 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java @@ -396,7 +396,7 @@ public class StreamsGroupTest { @Test public void testGroupState() { StreamsGroup streamsGroup = createStreamsGroup("foo"); - assertEquals(StreamsGroup.StreamsGroupState.EMPTY, streamsGroup.state()); + assertEquals(StreamsGroup.StreamsGroupState.INITIALIZING, streamsGroup.state()); StreamsGroupMember member1 = new StreamsGroupMember.Builder("member1") .setState(MemberState.STABLE) @@ -407,6 +407,11 @@ public class StreamsGroupTest { streamsGroup.updateMember(member1); streamsGroup.setGroupEpoch(1); + assertEquals(MemberState.STABLE, member1.state()); + assertEquals(StreamsGroup.StreamsGroupState.INITIALIZING, streamsGroup.state()); + + streamsGroup.setTopology(new StreamsTopology("topology-id", Collections.emptyMap())); + assertEquals(MemberState.STABLE, member1.state()); assertEquals(StreamsGroup.StreamsGroupState.ASSIGNING, streamsGroup.state()); @@ -668,7 +673,7 @@ public class StreamsGroupTest { public void testValidateDeleteGroup() { StreamsGroup streamsGroup = createStreamsGroup("foo"); - assertEquals(StreamsGroup.StreamsGroupState.EMPTY, streamsGroup.state()); + assertEquals(StreamsGroup.StreamsGroupState.INITIALIZING, streamsGroup.state()); assertDoesNotThrow(streamsGroup::validateDeleteGroup); StreamsGroupMember member1 = new StreamsGroupMember.Builder("member1") @@ -676,6 +681,7 @@ public class StreamsGroupTest { .setPreviousMemberEpoch(0) .build(); streamsGroup.updateMember(member1); + streamsGroup.setTopology(new StreamsTopology("topology-id", Collections.emptyMap())); assertEquals(StreamsGroup.StreamsGroupState.RECONCILING, streamsGroup.state()); assertThrows(GroupNotEmptyException.class, streamsGroup::validateDeleteGroup); @@ -689,6 +695,10 @@ public class StreamsGroupTest { assertEquals(StreamsGroup.StreamsGroupState.STABLE, streamsGroup.state()); assertThrows(GroupNotEmptyException.class, streamsGroup::validateDeleteGroup); + + streamsGroup.removeMember("member1"); + assertEquals(StreamsGroup.StreamsGroupState.EMPTY, streamsGroup.state()); + assertDoesNotThrow(streamsGroup::validateDeleteGroup); } @Test @@ -717,14 +727,13 @@ public class StreamsGroupTest { ); StreamsGroup group = new StreamsGroup(snapshotRegistry, "group-foo", metricsShard); snapshotRegistry.idempotentCreateSnapshot(0); - assertTrue(group.isInStates(Collections.singleton("empty"), 0)); - assertFalse(group.isInStates(Collections.singleton("Empty"), 0)); - - group.updateMember(new StreamsGroupMember.Builder("member1") - .build()); - snapshotRegistry.idempotentCreateSnapshot((1)); - assertTrue(group.isInStates(Collections.singleton("empty"), 0)); - assertTrue(group.isInStates(Collections.singleton("stable"), 1)); - assertFalse(group.isInStates(Collections.singleton("empty"), 1)); + assertTrue(group.isInStates(Collections.singleton("initializing"), 0)); + assertFalse(group.isInStates(Collections.singleton("Initializing"), 0)); + + group.setTopology(new StreamsTopology("topology-id", Collections.emptyMap())); + snapshotRegistry.idempotentCreateSnapshot(1); + assertTrue(group.isInStates(Collections.singleton("initializing"), 0)); + assertTrue(group.isInStates(Collections.singleton("empty"), 1)); + assertFalse(group.isInStates(Collections.singleton("initializing"), 1)); } } \ No newline at end of file diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/TargetAssignmentBuilderTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/TargetAssignmentBuilderTest.java index c6b135eeebf..230e63b9058 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/TargetAssignmentBuilderTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/TargetAssignmentBuilderTest.java @@ -40,8 +40,8 @@ import static org.apache.kafka.common.utils.Utils.mkEntry; import static org.apache.kafka.common.utils.Utils.mkMap; import static org.apache.kafka.coordinator.group.Assertions.assertUnorderedListEquals; import static org.apache.kafka.coordinator.group.CoordinatorRecordHelpersTest.mkMapOfPartitionRacks; -import static org.apache.kafka.coordinator.group.streams.CoordinatorStreamsRecordHelpers.newStreamsTargetAssignmentEpochRecord; -import static org.apache.kafka.coordinator.group.streams.CoordinatorStreamsRecordHelpers.newStreamsTargetAssignmentRecord; +import static org.apache.kafka.coordinator.group.streams.CoordinatorStreamsRecordHelpers.newStreamsGroupTargetAssignmentEpochRecord; +import static org.apache.kafka.coordinator.group.streams.CoordinatorStreamsRecordHelpers.newStreamsGroupTargetAssignmentRecord; import static org.apache.kafka.coordinator.group.streams.TargetAssignmentBuilder.createAssignmentMemberSpec; import static org.apache.kafka.coordinator.group.streams.TaskAssignmentTestUtil.mkAssignment; import static org.apache.kafka.coordinator.group.streams.TaskAssignmentTestUtil.mkTaskAssignment; @@ -299,7 +299,7 @@ public class TargetAssignmentBuilderTest { ); org.apache.kafka.coordinator.group.streams.TargetAssignmentBuilder.TargetAssignmentResult result = context.build(); - assertEquals(Collections.singletonList(newStreamsTargetAssignmentEpochRecord( + assertEquals(Collections.singletonList(newStreamsGroupTargetAssignmentEpochRecord( "my-group", 20 )), result.records()); @@ -338,7 +338,7 @@ public class TargetAssignmentBuilderTest { org.apache.kafka.coordinator.group.streams.TargetAssignmentBuilder.TargetAssignmentResult result = context.build(); - assertEquals(Collections.singletonList(newStreamsTargetAssignmentEpochRecord( + assertEquals(Collections.singletonList(newStreamsGroupTargetAssignmentEpochRecord( "my-group", 20 )), result.records()); @@ -391,17 +391,17 @@ public class TargetAssignmentBuilderTest { assertEquals(3, result.records().size()); assertUnorderedListEquals(Arrays.asList( - newStreamsTargetAssignmentRecord("my-group", "member-1", mkAssignment( + newStreamsGroupTargetAssignmentRecord("my-group", "member-1", mkAssignment( mkTaskAssignment(fooSubtopologyId, 4, 5, 6), mkTaskAssignment(barSubtopologyId, 4, 5, 6) ), Collections.emptyMap(), Collections.emptyMap()), - newStreamsTargetAssignmentRecord("my-group", "member-2", mkAssignment( + newStreamsGroupTargetAssignmentRecord("my-group", "member-2", mkAssignment( mkTaskAssignment(fooSubtopologyId, 1, 2, 3), mkTaskAssignment(barSubtopologyId, 1, 2, 3) ), Collections.emptyMap(), Collections.emptyMap()) ), result.records().subList(0, 2)); - assertEquals(newStreamsTargetAssignmentEpochRecord( + assertEquals(newStreamsGroupTargetAssignmentEpochRecord( "my-group", 20 ), result.records().get(2)); @@ -461,21 +461,21 @@ public class TargetAssignmentBuilderTest { assertEquals(4, result.records().size()); assertUnorderedListEquals(Arrays.asList( - newStreamsTargetAssignmentRecord("my-group", "member-1", mkAssignment( + newStreamsGroupTargetAssignmentRecord("my-group", "member-1", mkAssignment( mkTaskAssignment(fooSubtopologyId, 1, 2), mkTaskAssignment(barSubtopologyId, 1, 2) ), Collections.emptyMap(), Collections.emptyMap()), - newStreamsTargetAssignmentRecord("my-group", "member-2", mkAssignment( + newStreamsGroupTargetAssignmentRecord("my-group", "member-2", mkAssignment( mkTaskAssignment(fooSubtopologyId, 3, 4), mkTaskAssignment(barSubtopologyId, 3, 4) ), Collections.emptyMap(), Collections.emptyMap()), - newStreamsTargetAssignmentRecord("my-group", "member-3", mkAssignment( + newStreamsGroupTargetAssignmentRecord("my-group", "member-3", mkAssignment( mkTaskAssignment(fooSubtopologyId, 5, 6), mkTaskAssignment(barSubtopologyId, 5, 6) ), Collections.emptyMap(), Collections.emptyMap()) ), result.records().subList(0, 3)); - assertEquals(newStreamsTargetAssignmentEpochRecord( + assertEquals(newStreamsGroupTargetAssignmentEpochRecord( "my-group", 20 ), result.records().get(3)); @@ -547,21 +547,21 @@ public class TargetAssignmentBuilderTest { assertEquals(4, result.records().size()); assertUnorderedListEquals(Arrays.asList( - newStreamsTargetAssignmentRecord("my-group", "member-1", mkAssignment( + newStreamsGroupTargetAssignmentRecord("my-group", "member-1", mkAssignment( mkTaskAssignment(fooSubtopologyId, 1, 2), mkTaskAssignment(barSubtopologyId, 1, 2) ), Collections.emptyMap(), Collections.emptyMap()), - newStreamsTargetAssignmentRecord("my-group", "member-2", mkAssignment( + newStreamsGroupTargetAssignmentRecord("my-group", "member-2", mkAssignment( mkTaskAssignment(fooSubtopologyId, 3, 4), mkTaskAssignment(barSubtopologyId, 3, 4) ), Collections.emptyMap(), Collections.emptyMap()), - newStreamsTargetAssignmentRecord("my-group", "member-3", mkAssignment( + newStreamsGroupTargetAssignmentRecord("my-group", "member-3", mkAssignment( mkTaskAssignment(fooSubtopologyId, 5, 6), mkTaskAssignment(barSubtopologyId, 5, 6) ), Collections.emptyMap(), Collections.emptyMap()) ), result.records().subList(0, 3)); - assertEquals(newStreamsTargetAssignmentEpochRecord( + assertEquals(newStreamsGroupTargetAssignmentEpochRecord( "my-group", 20 ), result.records().get(3)); @@ -629,17 +629,17 @@ public class TargetAssignmentBuilderTest { // Member 1 has no record because its assignment did not change. assertUnorderedListEquals(Arrays.asList( - newStreamsTargetAssignmentRecord("my-group", "member-2", mkAssignment( + newStreamsGroupTargetAssignmentRecord("my-group", "member-2", mkAssignment( mkTaskAssignment(fooSubtopologyId, 3, 4, 5), mkTaskAssignment(barSubtopologyId, 3, 4, 5) ), Collections.emptyMap(), Collections.emptyMap()), - newStreamsTargetAssignmentRecord("my-group", "member-3", mkAssignment( + newStreamsGroupTargetAssignmentRecord("my-group", "member-3", mkAssignment( mkTaskAssignment(fooSubtopologyId, 6), mkTaskAssignment(barSubtopologyId, 6) ), Collections.emptyMap(), Collections.emptyMap()) ), result.records().subList(0, 2)); - assertEquals(newStreamsTargetAssignmentEpochRecord( + assertEquals(newStreamsGroupTargetAssignmentEpochRecord( "my-group", 20 ), result.records().get(2)); @@ -703,17 +703,17 @@ public class TargetAssignmentBuilderTest { assertEquals(3, result.records().size()); assertUnorderedListEquals(Arrays.asList( - newStreamsTargetAssignmentRecord("my-group", "member-1", mkAssignment( + newStreamsGroupTargetAssignmentRecord("my-group", "member-1", mkAssignment( mkTaskAssignment(fooSubtopologyId, 1, 2, 3), mkTaskAssignment(barSubtopologyId, 1, 2, 3) ), Collections.emptyMap(), Collections.emptyMap()), - newStreamsTargetAssignmentRecord("my-group", "member-2", mkAssignment( + newStreamsGroupTargetAssignmentRecord("my-group", "member-2", mkAssignment( mkTaskAssignment(fooSubtopologyId, 4, 5, 6), mkTaskAssignment(barSubtopologyId, 4, 5, 6) ), Collections.emptyMap(), Collections.emptyMap()) ), result.records().subList(0, 2)); - assertEquals(newStreamsTargetAssignmentEpochRecord( + assertEquals(newStreamsGroupTargetAssignmentEpochRecord( "my-group", 20 ), result.records().get(2)); @@ -783,13 +783,13 @@ public class TargetAssignmentBuilderTest { assertEquals(2, result.records().size()); assertUnorderedListEquals(Collections.singletonList( - newStreamsTargetAssignmentRecord("my-group", "member-3-a", mkAssignment( + newStreamsGroupTargetAssignmentRecord("my-group", "member-3-a", mkAssignment( mkTaskAssignment(fooSubtopologyId, 5, 6), mkTaskAssignment(barSubtopologyId, 5, 6) ), Collections.emptyMap(), Collections.emptyMap()) ), result.records().subList(0, 1)); - assertEquals(newStreamsTargetAssignmentEpochRecord( + assertEquals(newStreamsGroupTargetAssignmentEpochRecord( "my-group", 20 ), result.records().get(1));
