This is an automated email from the ASF dual-hosted git repository. cadonna pushed a commit to branch kip1071 in repository https://gitbox.apache.org/repos/asf/kafka.git
commit 9b4480292741a47fc4a2e5c9f639a634918303c7 Author: Bruno Cadonna <[email protected]> AuthorDate: Fri Jun 7 15:34:07 2024 +0200 Add streams group member See https://github.com/lucasbru/kafka/pull/19 --- .../org/apache/kafka/coordinator/group/Group.java | 1 + .../group/GroupCoordinatorRecordHelpers.java | 162 +++ .../coordinator/group/streams/StreamsGroup.java | 1104 ++++++++++++++++++++ .../group/streams/StreamsGroupMember.java | 704 +++++++++++++ .../group/streams/SubscribedTopicMetadata.java | 98 ++ .../StreamsGroupCurrentMemberAssignmentValue.json | 2 +- .../group/streams/StreamsGroupMemberTest.java | 298 ++++++ .../group/streams/StreamsGroupTest.java | 730 +++++++++++++ 8 files changed, 3098 insertions(+), 1 deletion(-) diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java index b5d63499751..79c1b72237b 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java @@ -37,6 +37,7 @@ public interface Group { CONSUMER("consumer"), CLASSIC("classic"), SHARE("share"), + STREAMS("streams"), UNKNOWN("unknown"); private final String name; diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordHelpers.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordHelpers.java index 98cf7f937e8..ea657361480 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordHelpers.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordHelpers.java @@ -52,12 +52,23 @@ import org.apache.kafka.coordinator.group.generated.ShareGroupTargetAssignmentMe import org.apache.kafka.coordinator.group.generated.ShareGroupTargetAssignmentMemberValue; import org.apache.kafka.coordinator.group.generated.ShareGroupTargetAssignmentMetadataKey; import org.apache.kafka.coordinator.group.generated.ShareGroupTargetAssignmentMetadataValue; +import org.apache.kafka.coordinator.group.generated.StreamsGroupCurrentMemberAssignmentKey; +import org.apache.kafka.coordinator.group.generated.StreamsGroupCurrentMemberAssignmentValue; +import org.apache.kafka.coordinator.group.generated.StreamsGroupMemberMetadataKey; +import org.apache.kafka.coordinator.group.generated.StreamsGroupMemberMetadataValue; +import org.apache.kafka.coordinator.group.generated.StreamsGroupMetadataKey; +import org.apache.kafka.coordinator.group.generated.StreamsGroupMetadataValue; +import org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMemberKey; +import org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMemberValue; +import org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMetadataKey; +import org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMetadataValue; import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyKey; import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue; import org.apache.kafka.coordinator.group.modern.TopicMetadata; import org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroupMember; import org.apache.kafka.coordinator.group.modern.consumer.ResolvedRegularExpression; import org.apache.kafka.coordinator.group.modern.share.ShareGroupMember; +import org.apache.kafka.coordinator.group.streams.StreamsGroupMember; import org.apache.kafka.server.common.ApiMessageAndVersion; import org.apache.kafka.server.common.MetadataVersion; @@ -112,6 +123,35 @@ public class GroupCoordinatorRecordHelpers { ); } + public static CoordinatorRecord newStreamsGroupMemberRecord( + String groupId, + StreamsGroupMember member + ) { + return new CoordinatorRecord( + new ApiMessageAndVersion( + new StreamsGroupMemberMetadataKey() + .setGroupId(groupId) + .setMemberId(member.memberId()), + (short) 11 + ), + new ApiMessageAndVersion( + new StreamsGroupMemberMetadataValue() + .setRackId(member.rackId()) + .setInstanceId(member.instanceId()) + .setClientId(member.clientId()) + .setClientHost(member.clientHost()) + .setRebalanceTimeoutMs(member.rebalanceTimeoutMs()) + .setTopologyHash(member.topologyHash()) + .setProcessId(member.processId()) + .setHostInfo(member.hostInfo()) + .setClientTags(member.clientTags()) + .setUserData(member.userData()) + .setAssignmentConfigs(member.assignmentConfigs()), + (short) 0 + ) + ); + } + /** * Creates a ConsumerGroupMemberMetadata tombstone. * @@ -211,6 +251,24 @@ public class GroupCoordinatorRecordHelpers { ); } + public static CoordinatorRecord newStreamsGroupEpochRecord( + String groupId, + int newGroupEpoch + ) { + return new CoordinatorRecord( + new ApiMessageAndVersion( + new StreamsGroupMetadataKey() + .setGroupId(groupId), + (short) 9 + ), + new ApiMessageAndVersion( + new StreamsGroupMetadataValue() + .setEpoch(newGroupEpoch), + (short) 0 + ) + ); + } + /** * Creates a ConsumerGroupMetadata tombstone. * @@ -269,6 +327,55 @@ public class GroupCoordinatorRecordHelpers { ); } + public static CoordinatorRecord newTargetAssignmentRecord( + String groupId, + String memberId, + Map<String, Set<Integer>> activeTasks, + Map<String, Set<Integer>> standbyTasks, + Map<String, Set<Integer>> warmupTasks + ) { + List<StreamsGroupTargetAssignmentMemberValue.TaskIds> activeTaskIds = new ArrayList<>(activeTasks.size()); + for (Map.Entry<String, Set<Integer>> entry : activeTasks.entrySet()) { + activeTaskIds.add( + new StreamsGroupTargetAssignmentMemberValue.TaskIds() + .setSubtopology(entry.getKey()) + .setPartitions(new ArrayList<>(entry.getValue())) + ); + } + List<StreamsGroupTargetAssignmentMemberValue.TaskIds> standbyTaskIds = new ArrayList<>(standbyTasks.size()); + for (Map.Entry<String, Set<Integer>> entry : standbyTasks.entrySet()) { + standbyTaskIds.add( + new StreamsGroupTargetAssignmentMemberValue.TaskIds() + .setSubtopology(entry.getKey()) + .setPartitions(new ArrayList<>(entry.getValue())) + ); + } + List<StreamsGroupTargetAssignmentMemberValue.TaskIds> warmupTaskIds = new ArrayList<>(warmupTasks.size()); + for (Map.Entry<String, Set<Integer>> entry : warmupTasks.entrySet()) { + warmupTaskIds.add( + new StreamsGroupTargetAssignmentMemberValue.TaskIds() + .setSubtopology(entry.getKey()) + .setPartitions(new ArrayList<>(entry.getValue())) + ); + } + + return new CoordinatorRecord( + new ApiMessageAndVersion( + new StreamsGroupTargetAssignmentMemberKey() + .setGroupId(groupId) + .setMemberId(memberId), + (short) 13 + ), + new ApiMessageAndVersion( + new StreamsGroupTargetAssignmentMemberValue() + .setActiveTasks(activeTaskIds) + .setStandbyTasks(standbyTaskIds) + .setWarmupTasks(warmupTaskIds), + (short) 0 + ) + ); + } + /** * Creates a ConsumerGroupTargetAssignmentMember tombstone. * @@ -316,6 +423,24 @@ public class GroupCoordinatorRecordHelpers { ); } + public static CoordinatorRecord newStreamsTargetAssignmentEpochRecord( + String groupId, + int assignmentEpoch + ) { + return new CoordinatorRecord( + new ApiMessageAndVersion( + new StreamsGroupTargetAssignmentMetadataKey() + .setGroupId(groupId), + (short) 12 + ), + new ApiMessageAndVersion( + new StreamsGroupTargetAssignmentMetadataValue() + .setAssignmentEpoch(assignmentEpoch), + (short) 0 + ) + ); + } + /** * Creates a ConsumerGroupTargetAssignmentMetadata tombstone. * @@ -365,6 +490,31 @@ public class GroupCoordinatorRecordHelpers { ); } + public static CoordinatorRecord newCurrentAssignmentRecord( + String groupId, + StreamsGroupMember member + ) { + return new CoordinatorRecord( + new ApiMessageAndVersion( + new StreamsGroupCurrentMemberAssignmentKey() + .setGroupId(groupId) + .setMemberId(member.memberId()), + (short) 14 + ), + new ApiMessageAndVersion( + new StreamsGroupCurrentMemberAssignmentValue() + .setMemberEpoch(member.memberEpoch()) + .setPreviousMemberEpoch(member.previousMemberEpoch()) + .setState(member.state().value()) + .setActiveTasks(toTaskIds(member.assignedActiveTasks())) + .setStandbyTasks(toTaskIds(member.assignedStandbyTasks())) + .setWarmupTasks(toTaskIds(member.assignedWarmupTasks())) + .setActiveTasksPendingRevocation(toTaskIds(member.activeTasksPendingRevocation())), + (short) 0 + ) + ); + } + /** * Creates a ConsumerGroupCurrentMemberAssignment tombstone. * @@ -943,6 +1093,18 @@ public class GroupCoordinatorRecordHelpers { return topics; } + private static List<StreamsGroupCurrentMemberAssignmentValue.TaskIds> toTaskIds( + Map<String, Set<Integer>> tasks + ) { + List<StreamsGroupCurrentMemberAssignmentValue.TaskIds> taskIds = new ArrayList<>(tasks.size()); + tasks.forEach((subtopologyId, partitions) -> + taskIds.add(new StreamsGroupCurrentMemberAssignmentValue.TaskIds() + .setSubtopology(subtopologyId) + .setPartitions(new ArrayList<>(partitions))) + ); + return taskIds; + } + /** * Creates a StreamsTopology record. * diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java new file mode 100644 index 00000000000..d28e9218994 --- /dev/null +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java @@ -0,0 +1,1104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.streams; + +import org.apache.kafka.clients.consumer.internals.ConsumerProtocol; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.ApiException; +import org.apache.kafka.common.errors.StaleMemberEpochException; +import org.apache.kafka.common.errors.UnknownMemberIdException; +import org.apache.kafka.common.message.ListGroupsResponseData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.coordinator.group.CoordinatorRecord; +import org.apache.kafka.coordinator.group.CoordinatorRecordHelpers; +import org.apache.kafka.coordinator.group.Group; +import org.apache.kafka.coordinator.group.OffsetExpirationCondition; +import org.apache.kafka.coordinator.group.OffsetExpirationConditionImpl; +import org.apache.kafka.coordinator.group.assignor.SubscriptionType; +import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetricsShard; +import org.apache.kafka.image.ClusterImage; +import org.apache.kafka.image.TopicImage; +import org.apache.kafka.image.TopicsImage; +import org.apache.kafka.timeline.SnapshotRegistry; +import org.apache.kafka.timeline.TimelineHashMap; +import org.apache.kafka.timeline.TimelineInteger; +import org.apache.kafka.timeline.TimelineObject; + +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; + +import static java.util.Collections.emptyMap; +import static org.apache.kafka.coordinator.group.assignor.SubscriptionType.HETEROGENEOUS; +import static org.apache.kafka.coordinator.group.assignor.SubscriptionType.HOMOGENEOUS; +import static org.apache.kafka.coordinator.group.streams.StreamsGroup.StreamsGroupState.ASSIGNING; +import static org.apache.kafka.coordinator.group.streams.StreamsGroup.StreamsGroupState.EMPTY; +import static org.apache.kafka.coordinator.group.streams.StreamsGroup.StreamsGroupState.RECONCILING; +import static org.apache.kafka.coordinator.group.streams.StreamsGroup.StreamsGroupState.STABLE; + +/** + * A Streams Group. All the metadata in this class are backed by + * records in the __consumer_offsets partitions. + */ +public class StreamsGroup implements Group { + + public enum StreamsGroupState { + EMPTY("Empty"), + ASSIGNING("Assigning"), + RECONCILING("Reconciling"), + STABLE("Stable"), + DEAD("Dead"); + + private final String name; + + private final String lowerCaseName; + + StreamsGroupState(String name) { + this.name = name; + this.lowerCaseName = name.toLowerCase(Locale.ROOT); + } + + @Override + public String toString() { + return name; + } + + public String toLowerCaseString() { + return lowerCaseName; + } + } + + public static class DeadlineAndEpoch { + static final DeadlineAndEpoch EMPTY = new DeadlineAndEpoch(0L, 0); + + public final long deadlineMs; + public final int epoch; + + DeadlineAndEpoch(long deadlineMs, int epoch) { + this.deadlineMs = deadlineMs; + this.epoch = epoch; + } + } + + /** + * The snapshot registry. + */ + private final SnapshotRegistry snapshotRegistry; + + /** + * The group id. + */ + private final String groupId; + + /** + * The group state. + */ + private final TimelineObject<StreamsGroupState> state; + + /** + * The group epoch. The epoch is incremented whenever the subscriptions + * are updated and it will trigger the computation of a new assignment + * for the group. + */ + private final TimelineInteger groupEpoch; + + /** + * The group members. + */ + private final TimelineHashMap<String, StreamsGroupMember> members; + + /** + * The static group members. + */ + private final TimelineHashMap<String, String> staticMembers; + + /** + * The number of members supporting each assignor name. + */ + private final TimelineHashMap<String, Integer> assignors; + + /** + * The target assignment epoch. An assignment epoch smaller than the group epoch + * means that a new assignment is required. The assignment epoch is updated when + * a new assignment is installed. + */ + private final TimelineInteger targetAssignmentEpoch; + + /** + * The target assignment per member id. + */ + private final TimelineHashMap<String, Assignment> targetAssignment; + + /** + * Reverse lookup map representing tasks with + * their current member assignments. + */ + private final TimelineHashMap<String, TimelineHashMap<Integer, String>> invertedTargetActiveTasksAssignment; + private final TimelineHashMap<String, TimelineHashMap<Integer, String>> invertedTargetStandbyTasksAssignment; + private final TimelineHashMap<String, TimelineHashMap<Integer, String>> invertedTargetWarmupTasksAssignment; + + /** + * The current partition epoch maps each topic-partitions to their current epoch where + * the epoch is the epoch of their owners. When a member revokes a partition, it removes + * its epochs from this map. When a member gets a partition, it adds its epochs to this map. + */ + private final TimelineHashMap<String, TimelineHashMap<Integer, Integer>> currentActiveTasksEpoch; + private final TimelineHashMap<String, TimelineHashMap<Integer, Integer>> currentStandbyTasksEpoch; + private final TimelineHashMap<String, TimelineHashMap<Integer, Integer>> currentWarmupTasksEpoch; + + /** + * The coordinator metrics. + */ + private final GroupCoordinatorMetricsShard metrics; + + /** + * The metadata refresh deadline. It consists of a timestamp in milliseconds together with + * the group epoch at the time of setting it. The metadata refresh time is considered as a + * soft state (read that it is not stored in a timeline data structure). It is like this + * because it is not persisted to the log. The group epoch is here to ensure that the + * metadata refresh deadline is invalidated if the group epoch does not correspond to + * the current group epoch. This can happen if the metadata refresh deadline is updated + * after having refreshed the metadata but the write operation failed. In this case, the + * time is not automatically rolled back. + */ + private DeadlineAndEpoch metadataRefreshDeadline = DeadlineAndEpoch.EMPTY; + + public StreamsGroup( + SnapshotRegistry snapshotRegistry, + String groupId, + GroupCoordinatorMetricsShard metrics + ) { + this.snapshotRegistry = Objects.requireNonNull(snapshotRegistry); + this.groupId = Objects.requireNonNull(groupId); + this.state = new TimelineObject<>(snapshotRegistry, EMPTY); + this.groupEpoch = new TimelineInteger(snapshotRegistry); + this.members = new TimelineHashMap<>(snapshotRegistry, 0); + this.staticMembers = new TimelineHashMap<>(snapshotRegistry, 0); + this.assignors = new TimelineHashMap<>(snapshotRegistry, 0); + this.targetAssignmentEpoch = new TimelineInteger(snapshotRegistry); + this.targetAssignment = new TimelineHashMap<>(snapshotRegistry, 0); + this.invertedTargetActiveTasksAssignment = new TimelineHashMap<>(snapshotRegistry, 0); + this.invertedTargetStandbyTasksAssignment = new TimelineHashMap<>(snapshotRegistry, 0); + this.invertedTargetWarmupTasksAssignment = new TimelineHashMap<>(snapshotRegistry, 0); + this.currentActiveTasksEpoch = new TimelineHashMap<>(snapshotRegistry, 0); + this.currentStandbyTasksEpoch = new TimelineHashMap<>(snapshotRegistry, 0); + this.currentWarmupTasksEpoch = new TimelineHashMap<>(snapshotRegistry, 0); + this.metrics = Objects.requireNonNull(metrics); + } + + /** + * @return The group type (Streams). + */ + @Override + public GroupType type() { + return GroupType.STREAMS; + } + + /** + * @return The current state as a String. + */ + @Override + public String stateAsString() { + return state.get().toString(); + } + + /** + * @return The current state as a String with given committedOffset. + */ + public String stateAsString(long committedOffset) { + return state.get(committedOffset).toString(); + } + + /** + * @return the group formatted as a list group response based on the committed offset. + */ + public ListGroupsResponseData.ListedGroup asListedGroup(long committedOffset) { + return new ListGroupsResponseData.ListedGroup() + .setGroupId(groupId) + .setProtocolType(ConsumerProtocol.PROTOCOL_TYPE) + .setGroupState(state.get(committedOffset).toString()) + .setGroupType(type().toString()); + } + + /** + * @return The group id. + */ + @Override + public String groupId() { + return groupId; + } + + /** + * @return The current state. + */ + public StreamsGroupState state() { + return state.get(); + } + + /** + * @return The current state based on committed offset. + */ + public StreamsGroupState state(long committedOffset) { + return state.get(committedOffset); + } + + /** + * @return The group epoch. + */ + public int groupEpoch() { + return groupEpoch.get(); + } + + /** + * Sets the group epoch. + * + * @param groupEpoch The new group epoch. + */ + public void setGroupEpoch(int groupEpoch) { + this.groupEpoch.set(groupEpoch); + maybeUpdateGroupState(); + } + + /** + * @return The target assignment epoch. + */ + public int assignmentEpoch() { + return targetAssignmentEpoch.get(); + } + + /** + * Sets the assignment epoch. + * + * @param targetAssignmentEpoch The new assignment epoch. + */ + public void setTargetAssignmentEpoch(int targetAssignmentEpoch) { + this.targetAssignmentEpoch.set(targetAssignmentEpoch); + maybeUpdateGroupState(); + } + + /** + * Get member id of a static member that matches the given group + * instance id. + * + * @param groupInstanceId The group instance id. + * + * @return The member id corresponding to the given instance id or null if it does not exist + */ + public String staticMemberId(String groupInstanceId) { + return staticMembers.get(groupInstanceId); + } + + /** + * Gets or creates a new member but without adding it to the group. Adding a member + * is done via the {@link StreamsGroup#updateMember(StreamsGroupMember)} method. + * + * @param memberId The member id. + * @param createIfNotExists Booleans indicating whether the member must be + * created if it does not exist. + * + * @return A StreamsGroupMember. + */ + public StreamsGroupMember getOrMaybeCreateMember( + String memberId, + boolean createIfNotExists + ) { + StreamsGroupMember member = members.get(memberId); + if (member != null) return member; + + if (!createIfNotExists) { + throw new UnknownMemberIdException( + String.format("Member %s is not a member of group %s.", memberId, groupId) + ); + } + + return new StreamsGroupMember.Builder(memberId).build(); + } + + /** + * Gets a static member. + * + * @param instanceId The group instance id. + * + * @return The member corresponding to the given instance id or null if it does not exist + */ + public StreamsGroupMember staticMember(String instanceId) { + String existingMemberId = staticMemberId(instanceId); + return existingMemberId == null ? null : getOrMaybeCreateMember(existingMemberId, false); + } + + /** + * Adds or updates the member. + * + * @param newMember The new member state. + */ + public void updateMember(StreamsGroupMember newMember) { + if (newMember == null) { + throw new IllegalArgumentException("newMember cannot be null."); + } + StreamsGroupMember oldMember = members.put(newMember.memberId(), newMember); + maybeUpdateTaskEpoch(oldMember, newMember); + updateStaticMember(newMember); + maybeUpdateGroupState(); + maybeUpdateAssignors(oldMember, newMember); + } + + /** + * Updates the member id stored against the instance id if the member is a static member. + * + * @param newMember The new member state. + */ + private void updateStaticMember(StreamsGroupMember newMember) { + if (newMember.instanceId() != null) { + staticMembers.put(newMember.instanceId(), newMember.memberId()); + } + } + + /** + * Remove the member from the group. + * + * @param memberId The member id to remove. + */ + public void removeMember(String memberId) { + StreamsGroupMember oldMember = members.remove(memberId); + maybeRemoveTaskEpoch(oldMember); + removeStaticMember(oldMember); + maybeUpdateGroupState(); + } + + /** + * Remove the static member mapping if the removed member is static. + * + * @param oldMember The member to remove. + */ + private void removeStaticMember(StreamsGroupMember oldMember) { + if (oldMember.instanceId() != null) { + staticMembers.remove(oldMember.instanceId()); + } + } + + /** + * Returns true if the member exists. + * + * @param memberId The member id. + * + * @return A boolean indicating whether the member exists or not. + */ + public boolean hasMember(String memberId) { + return members.containsKey(memberId); + } + + /** + * @return The number of members. + */ + public int numMembers() { + return members.size(); + } + + /** + * @return An immutable Map containing all the members keyed by their id. + */ + public Map<String, StreamsGroupMember> members() { + return Collections.unmodifiableMap(members); + } + + /** + * @return An immutable Map containing all the static members keyed by instance id. + */ + public Map<String, String> staticMembers() { + return Collections.unmodifiableMap(staticMembers); + } + + /** + * Returns the target assignment of the member. + * + * @return The StreamsGroupMemberAssignment or an EMPTY one if it does not + * exist. + */ + public Assignment targetAssignment(String memberId) { + return targetAssignment.getOrDefault(memberId, Assignment.EMPTY); + } + + /** + * @return An immutable map containing all the topic partitions + * with their current member assignments. + */ + public Map<String, Map<Integer, String>> invertedTargetActiveTasksAssignment() { + return Collections.unmodifiableMap(invertedTargetActiveTasksAssignment); + } + + public Map<String, Map<Integer, String>> invertedTargetStandbyTasksAssignment() { + return Collections.unmodifiableMap(invertedTargetStandbyTasksAssignment); + } + + public Map<String, Map<Integer, String>> invertedTargetWarmupTasksAssignment() { + return Collections.unmodifiableMap(invertedTargetWarmupTasksAssignment); + } + + /** + * Updates the server assignors count. + * + * @param oldMember The old member. + * @param newMember The new member. + */ + private void maybeUpdateAssignors( + StreamsGroupMember oldMember, + StreamsGroupMember newMember + ) { + maybeUpdateAssignors(assignors, oldMember, newMember); + } + + private static void maybeUpdateAssignors( + Map<String, Integer> serverAssignorCount, + StreamsGroupMember oldMember, + StreamsGroupMember newMember + ) { + if (oldMember != null) { + oldMember.assignor().ifPresent(name -> + serverAssignorCount.compute(name, StreamsGroup::decValue) + ); + } + if (newMember != null) { + newMember.assignor().ifPresent(name -> + serverAssignorCount.compute(name, StreamsGroup::incValue) + ); + } + } + + /** + * Updates the target assignment of a member. + * + * @param memberId The member id. + * @param newTargetAssignment The new target assignment. + */ + public void updateTargetAssignment(String memberId, Assignment newTargetAssignment) { + updateInvertedTargetActiveTasksAssignment( + memberId, + targetAssignment.getOrDefault(memberId, new Assignment(emptyMap(), emptyMap(), emptyMap())), + newTargetAssignment + ); + updateInvertedTargetStandbyTasksAssignment( + memberId, + targetAssignment.getOrDefault(memberId, new Assignment(emptyMap(), emptyMap(), emptyMap())), + newTargetAssignment + ); + updateInvertedTargetWarmupTasksAssignment( + memberId, + targetAssignment.getOrDefault(memberId, new Assignment(emptyMap(), emptyMap(), emptyMap())), + newTargetAssignment + ); + targetAssignment.put(memberId, newTargetAssignment); + } + + + private void updateInvertedTargetActiveTasksAssignment( + String memberId, + Assignment oldTargetAssignment, + Assignment newTargetAssignment + ) { + updateInvertedTargetAssignment( + memberId, + oldTargetAssignment, + newTargetAssignment, + invertedTargetActiveTasksAssignment + ); + } + + private void updateInvertedTargetStandbyTasksAssignment( + String memberId, + Assignment oldTargetAssignment, + Assignment newTargetAssignment + ) { + updateInvertedTargetAssignment( + memberId, + oldTargetAssignment, + newTargetAssignment, + invertedTargetStandbyTasksAssignment + ); + } + + private void updateInvertedTargetWarmupTasksAssignment( + String memberId, + Assignment oldTargetAssignment, + Assignment newTargetAssignment + ) { + updateInvertedTargetAssignment( + memberId, + oldTargetAssignment, + newTargetAssignment, + invertedTargetWarmupTasksAssignment + ); + } + + /** + * Updates the reverse lookup map of the target assignment. + * + * @param memberId The member Id. + * @param oldTargetAssignment The old target assignment. + * @param newTargetAssignment The new target assignment. + */ + private void updateInvertedTargetAssignment( + String memberId, + Assignment oldTargetAssignment, + Assignment newTargetAssignment, + TimelineHashMap<String, TimelineHashMap<Integer, String>> invertedTargetAssignment + ) { + // Combine keys from both old and new assignments. + Set<String> allSubtopologyIds = new HashSet<>(); + allSubtopologyIds.addAll(oldTargetAssignment.activeTasks().keySet()); + allSubtopologyIds.addAll(newTargetAssignment.activeTasks().keySet()); + + for (String subtopologyId : allSubtopologyIds) { + Set<Integer> oldPartitions = oldTargetAssignment.activeTasks().getOrDefault(subtopologyId, Collections.emptySet()); + Set<Integer> newPartitions = newTargetAssignment.activeTasks().getOrDefault(subtopologyId, Collections.emptySet()); + + TimelineHashMap<Integer, String> taskPartitionAssignment = invertedTargetAssignment.computeIfAbsent( + subtopologyId, k -> new TimelineHashMap<>(snapshotRegistry, Math.max(oldPartitions.size(), newPartitions.size())) + ); + + // Remove partitions that aren't present in the new assignment only if the partition is currently + // still assigned to the member in question. + // If p0 was moved from A to B, and the target assignment map was updated for B first, we don't want to + // remove the key p0 from the inverted map and undo the action when A eventually tries to update its assignment. + for (Integer partition : oldPartitions) { + if (!newPartitions.contains(partition) && memberId.equals(taskPartitionAssignment.get(partition))) { + taskPartitionAssignment.remove(partition); + } + } + + // Add partitions that are in the new assignment but not in the old assignment. + for (Integer partition : newPartitions) { + if (!oldPartitions.contains(partition)) { + taskPartitionAssignment.put(partition, memberId); + } + } + + if (taskPartitionAssignment.isEmpty()) { + invertedTargetAssignment.remove(subtopologyId); + } else { + invertedTargetAssignment.put(subtopologyId, taskPartitionAssignment); + } + } + } + + /** + * Removes the target assignment of a member. + * + * @param memberId The member id. + */ + public void removeTargetAssignment(String memberId) { + updateInvertedTargetActiveTasksAssignment( + memberId, + targetAssignment.getOrDefault(memberId, Assignment.EMPTY), + Assignment.EMPTY + ); + updateInvertedTargetStandbyTasksAssignment( + memberId, + targetAssignment.getOrDefault(memberId, Assignment.EMPTY), + Assignment.EMPTY + ); + updateInvertedTargetWarmupTasksAssignment( + memberId, + targetAssignment.getOrDefault(memberId, Assignment.EMPTY), + Assignment.EMPTY + ); + targetAssignment.remove(memberId); + } + + /** + * @return An immutable Map containing all the target assignment keyed by member id. + */ + public Map<String, Assignment> targetAssignment() { + return Collections.unmodifiableMap(targetAssignment); + } + + /** + * Returns the current epoch of a partition or -1 if the partition + * does not have one. + * + * @param topicId The topic id. + * @param partitionId The partition id. + * + * @return The epoch or -1. + */ + public int currentActiveTaskEpoch( + String topicId, int partitionId + ) { + Map<Integer, Integer> partitions = currentActiveTasksEpoch.get(topicId); + if (partitions == null) { + return -1; + } else { + return partitions.getOrDefault(partitionId, -1); + } + } + + /** + * Updates the metadata refresh deadline. + * + * @param deadlineMs The deadline in milliseconds. + * @param groupEpoch The associated group epoch. + */ + public void setMetadataRefreshDeadline( + long deadlineMs, + int groupEpoch + ) { + this.metadataRefreshDeadline = new DeadlineAndEpoch(deadlineMs, groupEpoch); + } + + /** + * Requests a metadata refresh. + */ + public void requestMetadataRefresh() { + this.metadataRefreshDeadline = DeadlineAndEpoch.EMPTY; + } + + /** + * Checks if a metadata refresh is required. A refresh is required in two cases: + * 1) The deadline is smaller or equal to the current time; + * 2) The group epoch associated with the deadline is larger than + * the current group epoch. This means that the operations which updated + * the deadline failed. + * + * @param currentTimeMs The current time in milliseconds. + * @return A boolean indicating whether a refresh is required or not. + */ + public boolean hasMetadataExpired(long currentTimeMs) { + return currentTimeMs >= metadataRefreshDeadline.deadlineMs || groupEpoch() < metadataRefreshDeadline.epoch; + } + + /** + * @return The metadata refresh deadline. + */ + public DeadlineAndEpoch metadataRefreshDeadline() { + return metadataRefreshDeadline; + } + + /** + * Validates the OffsetCommit request. + * + * @param memberId The member id. + * @param groupInstanceId The group instance id. + * @param memberEpoch The member epoch. + * @param isTransactional Whether the offset commit is transactional or not. It has no + * impact when a consumer group is used. + */ + @Override + public void validateOffsetCommit( + String memberId, + String groupInstanceId, + int memberEpoch, + boolean isTransactional + ) throws UnknownMemberIdException, StaleMemberEpochException { + // When the member epoch is -1, the request comes from either the admin client + // or a consumer which does not use the group management facility. In this case, + // the request can commit offsets if the group is empty. + if (memberEpoch < 0 && members().isEmpty()) return; + + final StreamsGroupMember member = getOrMaybeCreateMember(memberId, false); + validateMemberEpoch(memberEpoch, member.memberEpoch()); + } + + /** + * Validates the OffsetFetch request. + * + * @param memberId The member id for consumer groups. + * @param memberEpoch The member epoch for consumer groups. + * @param lastCommittedOffset The last committed offsets in the timeline. + */ + @Override + public void validateOffsetFetch( + String memberId, + int memberEpoch, + long lastCommittedOffset + ) throws UnknownMemberIdException, StaleMemberEpochException { + // When the member id is null and the member epoch is -1, the request either comes + // from the admin client or from a client which does not provide them. In this case, + // the fetch request is accepted. + if (memberId == null && memberEpoch < 0) return; + + final StreamsGroupMember member = members.get(memberId, lastCommittedOffset); + if (member == null) { + throw new UnknownMemberIdException(String.format("Member %s is not a member of group %s.", + memberId, groupId)); + } + validateMemberEpoch(memberEpoch, member.memberEpoch()); + } + + /** + * Validates the OffsetDelete request. + */ + @Override + public void validateOffsetDelete() {} + + /** + * Validates the DeleteGroups request. + */ + @Override + public void validateDeleteGroup() throws ApiException { + if (state() != StreamsGroupState.EMPTY) { + throw Errors.NON_EMPTY_GROUP.exception(); + } + } + + @Override + public boolean isSubscribedToTopic(String topic) { + return false; + } + + /** + * Populates the list of records with tombstone(s) for deleting the group. + * + * @param records The list of records. + */ + @Override + public void createGroupTombstoneRecords(List<CoordinatorRecord> records) { + members().forEach((memberId, member) -> + records.add(CoordinatorRecordHelpers.newCurrentAssignmentTombstoneRecord(groupId(), memberId)) + ); + + members().forEach((memberId, member) -> + records.add(CoordinatorRecordHelpers.newTargetAssignmentTombstoneRecord(groupId(), memberId)) + ); + records.add(CoordinatorRecordHelpers.newTargetAssignmentEpochTombstoneRecord(groupId())); + + members().forEach((memberId, member) -> + records.add(CoordinatorRecordHelpers.newMemberSubscriptionTombstoneRecord(groupId(), memberId)) + ); + + records.add(CoordinatorRecordHelpers.newGroupSubscriptionMetadataTombstoneRecord(groupId())); + records.add(CoordinatorRecordHelpers.newGroupEpochTombstoneRecord(groupId())); + } + + @Override + public boolean isEmpty() { + return state() == StreamsGroupState.EMPTY; + } + + /** + * See {@link org.apache.kafka.coordinator.group.OffsetExpirationCondition} + * + * @return The offset expiration condition for the group or Empty if no such condition exists. + */ + @Override + public Optional<OffsetExpirationCondition> offsetExpirationCondition() { + return Optional.of(new OffsetExpirationConditionImpl(offsetAndMetadata -> offsetAndMetadata.commitTimestampMs)); + } + + @Override + public boolean isInStates(Set<String> statesFilter, long committedOffset) { + return statesFilter.contains(state.get(committedOffset).toLowerCaseString()); + } + + /** + * Throws a StaleMemberEpochException if the received member epoch does not match + * the expected member epoch. + */ + private void validateMemberEpoch( + int receivedMemberEpoch, + int expectedMemberEpoch + ) throws StaleMemberEpochException { + if (receivedMemberEpoch != expectedMemberEpoch) { + throw new StaleMemberEpochException(String.format("The received member epoch %d does not match " + + "the expected member epoch %d.", receivedMemberEpoch, expectedMemberEpoch)); + } + } + + /** + * Updates the current state of the group. + */ + private void maybeUpdateGroupState() { + StreamsGroupState previousState = state.get(); + StreamsGroupState newState = STABLE; + if (members.isEmpty()) { + newState = EMPTY; + } else if (groupEpoch.get() > targetAssignmentEpoch.get()) { + newState = ASSIGNING; + } else { + for (StreamsGroupMember member : members.values()) { + if (!member.isReconciledTo(targetAssignmentEpoch.get())) { + newState = RECONCILING; + break; + } + } + } + + state.set(newState); + } + + /** + * Compute the subscription type of the consumer group. + * + * @param subscribedTopicNames A map of topic names to the count of members subscribed to each topic. + * + * @return {@link org.apache.kafka.coordinator.group.assignor.SubscriptionType#HOMOGENEOUS} if all members are subscribed to exactly the same topics; + * otherwise, {@link org.apache.kafka.coordinator.group.assignor.SubscriptionType#HETEROGENEOUS}. + */ + public static SubscriptionType subscriptionType( + Map<String, Integer> subscribedTopicNames, + int numberOfMembers + ) { + if (subscribedTopicNames.isEmpty()) { + return HOMOGENEOUS; + } + + for (int subscriberCount : subscribedTopicNames.values()) { + if (subscriberCount != numberOfMembers) { + return HETEROGENEOUS; + } + } + return HOMOGENEOUS; + } + + /** + * Updates the tasks epochs based on the old and the new member. + * + * @param oldMember The old member. + * @param newMember The new member. + */ + private void maybeUpdateTaskEpoch( + StreamsGroupMember oldMember, + StreamsGroupMember newMember + ) { + maybeRemoveTaskEpoch(oldMember); + addTaskEpochs( + newMember.assignedActiveTasks(), + newMember.assignedStandbyTasks(), + newMember.assignedWarmupTasks(), + newMember.memberEpoch() + ); + addTaskEpochs( + newMember.activeTasksPendingRevocation(), + emptyMap(), + emptyMap(), + newMember.memberEpoch() + ); + } + + /** + * Removes the task epochs for the provided member. + * + * @param oldMember The old member. + */ + private void maybeRemoveTaskEpoch( + StreamsGroupMember oldMember + ) { + if (oldMember != null) { + removeActiveTaskEpochs(oldMember.assignedActiveTasks(), oldMember.memberEpoch()); + removeStandbyTaskEpochs(oldMember.assignedStandbyTasks(), oldMember.memberEpoch()); + removeWarmupTaskEpochs(oldMember.assignedWarmupTasks(), oldMember.memberEpoch()); + removeActiveTaskEpochs(oldMember.activeTasksPendingRevocation(), oldMember.memberEpoch()); + } + } + + void removeActiveTaskEpochs( + Map<String, Set<Integer>> assignment, + int expectedEpoch + ) { + removeTaskEpochs(assignment, currentActiveTasksEpoch, expectedEpoch); + } + + void removeStandbyTaskEpochs( + Map<String, Set<Integer>> assignment, + int expectedEpoch + ) { + removeTaskEpochs(assignment, currentStandbyTasksEpoch, expectedEpoch); + } + + void removeWarmupTaskEpochs( + Map<String, Set<Integer>> assignment, + int expectedEpoch + ) { + removeTaskEpochs(assignment, currentWarmupTasksEpoch, expectedEpoch); + } + + /** + * Removes the task epochs based on the provided assignment. + * + * @param assignment The assignment. + * @param expectedEpoch The expected epoch. + * @throws IllegalStateException if the epoch does not match the expected one. + * package-private for testing. + */ + private void removeTaskEpochs( + Map<String, Set<Integer>> assignment, + TimelineHashMap<String, TimelineHashMap<Integer, Integer>> currentTasksEpoch, + int expectedEpoch + ) { + assignment.forEach((subtopologyId, assignedPartitions) -> { + currentTasksEpoch.compute(subtopologyId, (__, partitionsOrNull) -> { + if (partitionsOrNull != null) { + assignedPartitions.forEach(partitionId -> { + Integer prevValue = partitionsOrNull.remove(partitionId); + if (prevValue != expectedEpoch) { + throw new IllegalStateException( + String.format("Cannot remove the epoch %d from task %s_%s because the partition is " + + "still owned at a different epoch %d", expectedEpoch, subtopologyId, partitionId, prevValue)); + } + }); + if (partitionsOrNull.isEmpty()) { + return null; + } else { + return partitionsOrNull; + } + } else { + throw new IllegalStateException( + String.format("Cannot remove the epoch %d from %s because it does not have any epoch", + expectedEpoch, subtopologyId)); + } + }); + }); + } + + /** + * Adds the partitions epoch based on the provided assignment. + * + * @param activeTasks The assigned active tasks. + * @param standbyTasks The assigned standby tasks. + * @param warmupTasks The assigned warmup tasks. + * @param epoch The new epoch. + * @throws IllegalStateException if the partition already has an epoch assigned. + * package-private for testing. + */ + void addTaskEpochs( + Map<String, Set<Integer>> activeTasks, + Map<String, Set<Integer>> standbyTasks, + Map<String, Set<Integer>> warmupTasks, + int epoch + ) { + addTaskEpochs(activeTasks, epoch, currentActiveTasksEpoch); + addTaskEpochs(standbyTasks, epoch, currentStandbyTasksEpoch); + addTaskEpochs(warmupTasks, epoch, currentWarmupTasksEpoch); + } + + void addTaskEpochs( + Map<String, Set<Integer>> tasks, + int epoch, + TimelineHashMap<String, TimelineHashMap<Integer, Integer>> currentTasksEpoch + ) { + tasks.forEach((subtopologyId, assignedTaskPartitions) -> { + currentTasksEpoch.compute(subtopologyId, (__, partitionsOrNull) -> { + if (partitionsOrNull == null) { + partitionsOrNull = new TimelineHashMap<>(snapshotRegistry, assignedTaskPartitions.size()); + } + for (Integer partitionId : assignedTaskPartitions) { + Integer prevValue = partitionsOrNull.put(partitionId, epoch); + if (prevValue != null) { + throw new IllegalStateException( + String.format("Cannot set the epoch of %s-%s to %d because the partition is " + + "still owned at epoch %d", subtopologyId, partitionId, epoch, prevValue)); + } + } + return partitionsOrNull; + }); + }); + } + + /** + * Decrements value by 1; returns null when reaching zero. This helper is + * meant to be used with Map#compute. + */ + private static Integer decValue(String key, Integer value) { + if (value == null) return null; + return value == 1 ? null : value - 1; + } + + /** + * Increments value by 1; This helper is meant to be used with Map#compute. + */ + private static Integer incValue(String key, Integer value) { + return value == null ? 1 : value + 1; + } + + /** + * Populate the record list with the records needed to create the given Streams group. + * + * @param records The list to which the new records are added. + */ + public void createStreamsGroupRecords( + List<CoordinatorRecord> records + ) { + members().forEach((__, streamsGroupMember) -> + records.add(CoordinatorRecordHelpers.newStreamsGroupMemberRecord(groupId(), streamsGroupMember)) + ); + + records.add(CoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId(), groupEpoch())); + + members().forEach((streamsGroupMemberId, streamsGroupMember) -> + records.add(CoordinatorRecordHelpers.newTargetAssignmentRecord( + groupId(), + streamsGroupMemberId, + targetAssignment(streamsGroupMemberId).activeTasks(), + targetAssignment(streamsGroupMemberId).standbyTasks(), + targetAssignment(streamsGroupMemberId).warmupTasks() + )) + ); + + records.add(CoordinatorRecordHelpers.newStreamsTargetAssignmentEpochRecord(groupId(), groupEpoch())); + + members().forEach((__, streamsGroupMember) -> + records.add(CoordinatorRecordHelpers.newCurrentAssignmentRecord(groupId(), streamsGroupMember)) + ); + } + + /** + * @return The map of topic id and partition set converted from the list of TopicPartition. + */ + private static Map<Uuid, Set<Integer>> topicPartitionMapFromList( + List<TopicPartition> partitions, + TopicsImage topicsImage + ) { + Map<Uuid, Set<Integer>> topicPartitionMap = new HashMap<>(); + partitions.forEach(topicPartition -> { + TopicImage topicImage = topicsImage.getTopic(topicPartition.topic()); + if (topicImage != null) { + topicPartitionMap + .computeIfAbsent(topicImage.id(), __ -> new HashSet<>()) + .add(topicPartition.partition()); + } + }); + return topicPartitionMap; + } + + /** + * Checks whether the member has any unreleased tasks. + * + * @param member The member to check. + * @return A boolean indicating whether the member has partitions in the target + * assignment that hasn't been revoked by other members. + */ + public boolean waitingOnUnreleasedActiveTasks(StreamsGroupMember member) { + if (member.state() == MemberState.UNRELEASED_TASKS) { + for (Map.Entry<String, Set<Integer>> entry : targetAssignment().get(member.memberId()).activeTasks().entrySet()) { + String subtopologyId = entry.getKey(); + Set<Integer> assignedActiveTasks = member.assignedActiveTasks().getOrDefault(subtopologyId, Collections.emptySet()); + + for (int partition : entry.getValue()) { + if (!assignedActiveTasks.contains(partition) && currentActiveTaskEpoch(subtopologyId, partition) != -1) { + return true; + } + } + } + } + return false; + } +} diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroupMember.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroupMember.java new file mode 100644 index 00000000000..0379a58bb2c --- /dev/null +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroupMember.java @@ -0,0 +1,704 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.streams; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.message.ConsumerGroupDescribeResponseData; +import org.apache.kafka.coordinator.group.consumer.ConsumerGroupMember; +import org.apache.kafka.coordinator.group.generated.StreamsGroupCurrentMemberAssignmentValue; +import org.apache.kafka.coordinator.group.generated.StreamsGroupMemberMetadataValue; +import org.apache.kafka.image.TopicImage; +import org.apache.kafka.image.TopicsImage; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.OptionalInt; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * StreamsGroupMember contains all the information related to a member + * within a Streams group. This class is immutable and is fully backed + * by records stored in the __consumer_offsets topic. + */ +public class StreamsGroupMember { + + /** + * A builder that facilitates the creation of a new member or the update of + * an existing one. + * + * Please refer to the javadoc of {{@link StreamsGroupMember}} for the + * definition of the fields. + */ + public static class Builder { + private final String memberId; + private int memberEpoch = 0; + private int previousMemberEpoch = -1; + private MemberState state = MemberState.STABLE; + private String instanceId = null; + private String rackId = null; + private int rebalanceTimeoutMs = -1; + private String clientId = ""; + private String clientHost = ""; + private byte[] topologyHash; + private String assignor; + private String processId; + private StreamsGroupMemberMetadataValue.HostInfo hostInfo; + private List<StreamsGroupMemberMetadataValue.KeyValue> clientTags; + private byte[] userData; + private List<StreamsGroupMemberMetadataValue.KeyValue> assignmentConfigs; + private Map<String, Set<Integer>> assignedActiveTasks = Collections.emptyMap(); + private Map<String, Set<Integer>> assignedStandbyTasks = Collections.emptyMap(); + private Map<String, Set<Integer>> assignedWarmupTasks = Collections.emptyMap(); + private Map<String, Set<Integer>> activeTasksPendingRevocation = Collections.emptyMap(); + + public Builder(String memberId) { + this.memberId = Objects.requireNonNull(memberId); + } + + public Builder(StreamsGroupMember member) { + Objects.requireNonNull(member); + + this.memberId = member.memberId; + this.memberEpoch = member.memberEpoch; + this.previousMemberEpoch = member.previousMemberEpoch; + this.instanceId = member.instanceId; + this.rackId = member.rackId; + this.rebalanceTimeoutMs = member.rebalanceTimeoutMs; + this.clientId = member.clientId; + this.clientHost = member.clientHost; + this.topologyHash = member.topologyHash; + this.assignor = member.assignor; + this.processId = member.processId; + this.hostInfo = member.hostInfo; + this.clientTags = member.clientTags; + this.userData = member.userData; + this.assignmentConfigs = member.assignmentConfigs; + this.state = member.state; + this.assignedActiveTasks = member.assignedActiveTasks; + this.assignedStandbyTasks = member.assignedStandbyTasks; + this.assignedWarmupTasks = member.assignedWarmupTasks; + this.activeTasksPendingRevocation = member.activeTasksPendingRevocation; + } + + public Builder updateMemberEpoch(int memberEpoch) { + int currentMemberEpoch = this.memberEpoch; + this.memberEpoch = memberEpoch; + this.previousMemberEpoch = currentMemberEpoch; + return this; + } + + public Builder setMemberEpoch(int memberEpoch) { + this.memberEpoch = memberEpoch; + return this; + } + + public Builder setPreviousMemberEpoch(int previousMemberEpoch) { + this.previousMemberEpoch = previousMemberEpoch; + return this; + } + + public Builder setInstanceId(String instanceId) { + this.instanceId = instanceId; + return this; + } + + public Builder maybeUpdateInstanceId(Optional<String> instanceId) { + this.instanceId = instanceId.orElse(this.instanceId); + return this; + } + + public Builder setRackId(String rackId) { + this.rackId = rackId; + return this; + } + + public Builder maybeUpdateRackId(Optional<String> rackId) { + this.rackId = rackId.orElse(this.rackId); + return this; + } + + public Builder setRebalanceTimeoutMs(int rebalanceTimeoutMs) { + this.rebalanceTimeoutMs = rebalanceTimeoutMs; + return this; + } + + public Builder maybeUpdateRebalanceTimeoutMs(OptionalInt rebalanceTimeoutMs) { + this.rebalanceTimeoutMs = rebalanceTimeoutMs.orElse(this.rebalanceTimeoutMs); + return this; + } + + public StreamsGroupMember.Builder maybeUpdateAssignor(Optional<String> assignor) { + this.assignor = assignor.orElse(this.assignor); + return this; + } + + public Builder setClientId(String clientId) { + this.clientId = clientId; + return this; + } + + public Builder setClientHost(String clientHost) { + this.clientHost = clientHost; + return this; + } + + public Builder setState(MemberState state) { + this.state = state; + return this; + } + + public Builder setTopologyHash(byte[] topologyHash) { + this.topologyHash = topologyHash; + return this; + } + + public Builder setAssignor(String assignor) { + this.assignor = assignor; + return this; + } + + public Builder setProcessId(String processId) { + this.processId = processId; + return this; + } + + public Builder setHostInfo(StreamsGroupMemberMetadataValue.HostInfo hostInfo) { + this.hostInfo = hostInfo; + return this; + } + + public Builder setClientTags(List<StreamsGroupMemberMetadataValue.KeyValue> clientTags) { + this.clientTags = clientTags; + return this; + } + + public Builder setUserData(byte[] userData) { + this.userData = userData; + return this; + } + + public Builder setAssignmentConfigs(List<StreamsGroupMemberMetadataValue.KeyValue> assignmentConfigs) { + this.assignmentConfigs = assignmentConfigs; + return this; + } + + public Builder setAssignedActiveTasks(Map<String, Set<Integer>> assignedActiveTasks) { + this.assignedActiveTasks = assignedActiveTasks; + return this; + } + + public Builder setAssignedStandbyTasks(Map<String, Set<Integer>> assignedStandbyTasks) { + this.assignedStandbyTasks = assignedStandbyTasks; + return this; + } + + public Builder setAssignedWarmupTasks(Map<String, Set<Integer>> assignedWarmupTasks) { + this.assignedWarmupTasks = assignedWarmupTasks; + return this; + } + + public Builder setActiveTasksPendingRevocation(Map<String, Set<Integer>> activeTasksPendingRevocation) { + this.activeTasksPendingRevocation = activeTasksPendingRevocation; + return this; + } + + public Builder updateWith(StreamsGroupMemberMetadataValue record) { + setInstanceId(record.instanceId()); + setRackId(record.rackId()); + setClientId(record.clientId()); + setClientHost(record.clientHost()); + setRebalanceTimeoutMs(record.rebalanceTimeoutMs()); + setTopologyHash(record.topologyHash()); + setAssignor(record.assignor()); + setProcessId(record.processId()); + setHostInfo(record.hostInfo()); + setClientTags(record.clientTags()); + setUserData(record.userData()); + setAssignmentConfigs(record.assignmentConfigs()); + return this; + } + + public Builder updateWith(StreamsGroupCurrentMemberAssignmentValue record) { + setMemberEpoch(record.memberEpoch()); + setPreviousMemberEpoch(record.previousMemberEpoch()); + setState(MemberState.fromValue(record.state())); + setAssignedActiveTasks(assignmentFromTaskIds(record.activeTasks())); + setAssignedStandbyTasks(assignmentFromTaskIds(record.standbyTasks())); + setAssignedWarmupTasks(assignmentFromTaskIds(record.warmupTasks())); + setActiveTasksPendingRevocation(assignmentFromTaskIds(record.activeTasksPendingRevocation())); + return this; + } + + private Map<String, Set<Integer>> assignmentFromTaskIds( + List<StreamsGroupCurrentMemberAssignmentValue.TaskIds> topicPartitionsList + ) { + return topicPartitionsList.stream().collect(Collectors.toMap( + StreamsGroupCurrentMemberAssignmentValue.TaskIds::subtopology, + taskIds -> Collections.unmodifiableSet(new HashSet<>(taskIds.partitions())))); + } + + public StreamsGroupMember build() { + return new StreamsGroupMember( + memberId, + memberEpoch, + previousMemberEpoch, + instanceId, + rackId, + rebalanceTimeoutMs, + clientId, + clientHost, + topologyHash, + assignor, + processId, + hostInfo, + clientTags, + userData, + assignmentConfigs, + state, + assignedActiveTasks, + assignedStandbyTasks, + assignedWarmupTasks, + activeTasksPendingRevocation + ); + } + } + + /** + * The member id. + */ + private final String memberId; + + /** + * The current member epoch. + */ + private final int memberEpoch; + + /** + * The previous member epoch. + */ + private final int previousMemberEpoch; + + /** + * The member state. + */ + private final MemberState state; + + /** + * The instance id provided by the member. + */ + private final String instanceId; + + /** + * The rack id provided by the member. + */ + private final String rackId; + + /** + * The rebalance timeout provided by the member. + */ + private final int rebalanceTimeoutMs; + + /** + * The client id reported by the member. + */ + private final String clientId; + + /** + * The host reported by the member. + */ + private final String clientHost; + + /** + * The topology hash + */ + private byte[] topologyHash; + + /** + * The assignor + */ + private String assignor; + + /** + * The process ID + */ + private String processId; + + /** + * The host info + */ + private StreamsGroupMemberMetadataValue.HostInfo hostInfo; + + /** + * The client tags + */ + private List<StreamsGroupMemberMetadataValue.KeyValue> clientTags; + + /** + * The user data + */ + private byte[] userData; + + /** + * The assignment configs + */ + private List<StreamsGroupMemberMetadataValue.KeyValue> assignmentConfigs; + + /** + * Active tasks assigned to this member. + */ + private final Map<String, Set<Integer>> assignedActiveTasks; + + /** + * Standby tasks assigned to this member. + */ + private final Map<String, Set<Integer>> assignedStandbyTasks; + + /** + * Warmup tasks assigned to this member. + */ + private final Map<String, Set<Integer>> assignedWarmupTasks; + + /** + * Active tasks being revoked by this member. + */ + private final Map<String, Set<Integer>> activeTasksPendingRevocation; + + private StreamsGroupMember( + String memberId, + int memberEpoch, + int previousMemberEpoch, + String instanceId, + String rackId, + int rebalanceTimeoutMs, + String clientId, + String clientHost, + byte[] topologyHash, + String assignor, + String processId, + StreamsGroupMemberMetadataValue.HostInfo hostInfo, + List<StreamsGroupMemberMetadataValue.KeyValue> clientTags, + byte[] userData, + List<StreamsGroupMemberMetadataValue.KeyValue> assignmentConfigs, + MemberState state, + Map<String, Set<Integer>> assignedActiveTasks, + Map<String, Set<Integer>> assignedStandbyTasks, + Map<String, Set<Integer>> assignedWarmupTasks, + Map<String, Set<Integer>> activeTasksPendingRevocation + ) { + this.memberId = memberId; + this.memberEpoch = memberEpoch; + this.previousMemberEpoch = previousMemberEpoch; + this.state = state; + this.instanceId = instanceId; + this.rackId = rackId; + this.rebalanceTimeoutMs = rebalanceTimeoutMs; + this.clientId = clientId; + this.clientHost = clientHost; + this.topologyHash = topologyHash; + this.assignor = assignor; + this.processId = processId; + this.hostInfo = hostInfo; + this.clientTags = clientTags; + this.userData = userData; + this.assignmentConfigs = assignmentConfigs; + this.assignedActiveTasks = assignedActiveTasks; + this.assignedStandbyTasks = assignedStandbyTasks; + this.assignedWarmupTasks = assignedWarmupTasks; + this.activeTasksPendingRevocation = activeTasksPendingRevocation; + } + + /** + * @return The member id. + */ + public String memberId() { + return memberId; + } + + /** + * @return The current member epoch. + */ + public int memberEpoch() { + return memberEpoch; + } + + /** + * @return The previous member epoch. + */ + public int previousMemberEpoch() { + return previousMemberEpoch; + } + + /** + * @return The instance id. + */ + public String instanceId() { + return instanceId; + } + + /** + * @return The rack id. + */ + public String rackId() { + return rackId; + } + + /** + * @return The rebalance timeout in millis. + */ + public int rebalanceTimeoutMs() { + return rebalanceTimeoutMs; + } + + /** + * @return The client id. + */ + public String clientId() { + return clientId; + } + + /** + * @return The client host. + */ + public String clientHost() { + return clientHost; + } + + /** + * @return The process ID + */ + public byte[] topologyHash() { + return topologyHash; + } + + /** + * @return The assignor + */ + public Optional<String> assignor() { + return Optional.ofNullable(assignor); + } + + /** + * @return The process ID + */ + public String processId() { + return processId; + } + + /** + * @return The host info + */ + public StreamsGroupMemberMetadataValue.HostInfo hostInfo() { + return hostInfo; + } + + /** + * @return The client tags + */ + public List<StreamsGroupMemberMetadataValue.KeyValue> clientTags() { + return clientTags; + } + + public byte[] userData() { + return userData; + } + + /** + * @return The assignment configs + */ + public List<StreamsGroupMemberMetadataValue.KeyValue> assignmentConfigs() { + return assignmentConfigs; + } + + /** + * @return The current state. + */ + public MemberState state() { + return state; + } + + /** + * @return True if the member is in the Stable state and at the desired epoch. + */ + public boolean isReconciledTo(int targetAssignmentEpoch) { + return state == MemberState.STABLE && memberEpoch == targetAssignmentEpoch; + } + + /** + * @return The set of assigned active tasks. + */ + public Map<String, Set<Integer>> assignedActiveTasks() { + return assignedActiveTasks; + } + + /** + * @return The set of assigned standby tasks. + */ + public Map<String, Set<Integer>> assignedStandbyTasks() { + return assignedStandbyTasks; + } + + /** + * @return The set of assigned warm-up tasks. + */ + public Map<String, Set<Integer>> assignedWarmupTasks() { + return assignedWarmupTasks; + } + + /** + * @return The set of active tasks awaiting revocation from the member. + */ + public Map<String, Set<Integer>> activeTasksPendingRevocation() { + return activeTasksPendingRevocation; + } + + private static List<ConsumerGroupDescribeResponseData.TopicPartitions> topicPartitionsFromMap( + Map<Uuid, Set<Integer>> partitions, + TopicsImage topicsImage + ) { + List<ConsumerGroupDescribeResponseData.TopicPartitions> topicPartitions = new ArrayList<>(); + partitions.forEach((topicId, partitionSet) -> { + String topicName = lookupTopicNameById(topicId, topicsImage); + if (topicName != null) { + topicPartitions.add(new ConsumerGroupDescribeResponseData.TopicPartitions() + .setTopicId(topicId) + .setTopicName(topicName) + .setPartitions(new ArrayList<>(partitionSet))); + } + }); + return topicPartitions; + } + + private static String lookupTopicNameById( + Uuid topicId, + TopicsImage topicsImage + ) { + TopicImage topicImage = topicsImage.getTopic(topicId); + if (topicImage != null) { + return topicImage.name(); + } else { + return null; + } + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + StreamsGroupMember that = (StreamsGroupMember) o; + return memberEpoch == that.memberEpoch + && previousMemberEpoch == that.previousMemberEpoch + && rebalanceTimeoutMs == that.rebalanceTimeoutMs + && Objects.equals(memberId, that.memberId) + && state == that.state + && Objects.equals(instanceId, that.instanceId) + && Objects.equals(rackId, that.rackId) + && Objects.equals(clientId, that.clientId) + && Objects.equals(clientHost, that.clientHost) + && Objects.deepEquals(topologyHash, that.topologyHash) + && Objects.equals(assignor, that.assignor) + && Objects.equals(processId, that.processId) + && Objects.equals(hostInfo, that.hostInfo) + && Objects.equals(clientTags, that.clientTags) + && Objects.deepEquals(userData, that.userData) + && Objects.equals(assignmentConfigs, that.assignmentConfigs) + && Objects.equals(assignedActiveTasks, that.assignedActiveTasks) + && Objects.equals(assignedStandbyTasks, that.assignedStandbyTasks) + && Objects.equals(assignedWarmupTasks, that.assignedWarmupTasks) + && Objects.equals(activeTasksPendingRevocation, that.activeTasksPendingRevocation); + } + + @Override + public int hashCode() { + int result = memberId != null ? memberId.hashCode() : 0; + result = 31 * result + memberEpoch; + result = 31 * result + previousMemberEpoch; + result = 31 * result + Objects.hashCode(state); + result = 31 * result + Objects.hashCode(instanceId); + result = 31 * result + Objects.hashCode(rackId); + result = 31 * result + rebalanceTimeoutMs; + result = 31 * result + Objects.hashCode(clientId); + result = 31 * result + Objects.hashCode(clientHost); + result = 31 * result + Arrays.hashCode(topologyHash); + result = 31 * result + Objects.hashCode(assignor); + result = 31 * result + Objects.hashCode(processId); + result = 31 * result + Objects.hashCode(hostInfo); + result = 31 * result + Objects.hashCode(clientTags); + result = 31 * result + Arrays.hashCode(userData); + result = 31 * result + Objects.hashCode(assignmentConfigs); + result = 31 * result + Objects.hashCode(assignedActiveTasks); + result = 31 * result + Objects.hashCode(assignedStandbyTasks); + result = 31 * result + Objects.hashCode(assignedWarmupTasks); + result = 31 * result + Objects.hashCode(activeTasksPendingRevocation); + return result; + } + + @Override + public String toString() { + return "StreamsGroupMember(" + + "memberId='" + memberId + '\'' + + ", memberEpoch=" + memberEpoch + + ", previousMemberEpoch=" + previousMemberEpoch + + ", state='" + state + '\'' + + ", instanceId='" + instanceId + '\'' + + ", rackId='" + rackId + '\'' + + ", rebalanceTimeoutMs=" + rebalanceTimeoutMs + + ", clientId='" + clientId + '\'' + + ", clientHost='" + clientHost + '\'' + + ", assignedActiveTasks=" + assignedActiveTasks + + ", assignedStandbyTasks=" + assignedStandbyTasks + + ", assignedWarmupTasks=" + assignedWarmupTasks + + ", activeTasksPendingRevocation=" + activeTasksPendingRevocation + + ')'; + } + + /** + * @return True if the two provided members have different assigned active tasks. + */ + public static boolean hasAssignedActiveTasksChanged( + StreamsGroupMember member1, + StreamsGroupMember member2 + ) { + return !member1.assignedActiveTasks().equals(member2.assignedActiveTasks()); + } + + /** + * @return True if the two provided members have different assigned active tasks. + */ + public static boolean hasAssignedStandbyTasksChanged( + StreamsGroupMember member1, + StreamsGroupMember member2 + ) { + return !member1.assignedStandbyTasks().equals(member2.assignedStandbyTasks()); + } + + /** + * @return True if the two provided members have different assigned active tasks. + */ + public static boolean hasAssignedWarmupTasksChanged( + StreamsGroupMember member1, + StreamsGroupMember member2 + ) { + return !member1.assignedWarmupTasks().equals(member2.assignedWarmupTasks()); + } +} diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/SubscribedTopicMetadata.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/SubscribedTopicMetadata.java new file mode 100644 index 00000000000..fb9b1133637 --- /dev/null +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/SubscribedTopicMetadata.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.streams; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.coordinator.group.assignor.SubscribedTopicDescriber; +import org.apache.kafka.coordinator.group.consumer.TopicMetadata; + +import java.util.Collections; +import java.util.Map; +import java.util.Objects; +import java.util.Set; + +/** + * The subscribed topic metadata class is used by the {@link org.apache.kafka.coordinator.group.assignor.PartitionAssignor} to obtain + * topic and partition metadata for the topics that the consumer group is subscribed to. + */ +public class SubscribedTopicMetadata implements SubscribedTopicDescriber { + /** + * The topic Ids mapped to their corresponding {@link org.apache.kafka.coordinator.group.consumer.TopicMetadata} + * object, which contains topic and partition metadata. + */ + private final Map<Uuid, org.apache.kafka.coordinator.group.consumer.TopicMetadata> topicMetadata; + + public SubscribedTopicMetadata(Map<Uuid, org.apache.kafka.coordinator.group.consumer.TopicMetadata> topicMetadata) { + this.topicMetadata = Objects.requireNonNull(topicMetadata); + } + + /** + * Map of topic Ids to topic metadata. + * + * @return The map of topic Ids to topic metadata. + */ + public Map<Uuid, org.apache.kafka.coordinator.group.consumer.TopicMetadata> topicMetadata() { + return this.topicMetadata; + } + + /** + * The number of partitions for the given topic Id. + * + * @param topicId Uuid corresponding to the topic. + * @return The number of partitions corresponding to the given topic Id, + * or -1 if the topic Id does not exist. + */ + @Override + public int numPartitions(Uuid topicId) { + org.apache.kafka.coordinator.group.consumer.TopicMetadata topic = this.topicMetadata.get(topicId); + return topic == null ? -1 : topic.numPartitions(); + } + + /** + * Returns all the available racks associated with the replicas of the given partition. + * + * @param topicId Uuid corresponding to the partition's topic. + * @param partition Partition Id within the topic. + * @return The set of racks corresponding to the replicas of the topics partition. + * If the topic Id does not exist or no partition rack information is available, an empty set is returned. + */ + @Override + public Set<String> racksForPartition(Uuid topicId, int partition) { + TopicMetadata topic = this.topicMetadata.get(topicId); + return topic == null ? Collections.emptySet() : topic.partitionRacks().getOrDefault(partition, Collections.emptySet()); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + SubscribedTopicMetadata that = (SubscribedTopicMetadata) o; + return topicMetadata.equals(that.topicMetadata); + } + + @Override + public int hashCode() { + return topicMetadata.hashCode(); + } + + @Override + public String toString() { + return "SubscribedTopicMetadata(" + + "topicMetadata=" + topicMetadata + + ')'; + } +} diff --git a/group-coordinator/src/main/resources/common/message/StreamsGroupCurrentMemberAssignmentValue.json b/group-coordinator/src/main/resources/common/message/StreamsGroupCurrentMemberAssignmentValue.json index 0cbe1ba1e4e..aaaccf7d865 100644 --- a/group-coordinator/src/main/resources/common/message/StreamsGroupCurrentMemberAssignmentValue.json +++ b/group-coordinator/src/main/resources/common/message/StreamsGroupCurrentMemberAssignmentValue.json @@ -32,7 +32,7 @@ "about": "Currently assigned standby tasks for this streams client." }, { "name": "WarmupTasks", "versions": "0+", "type": "[]TaskIds", "about": "Currently assigned warming up tasks for this streams client." }, - { "name": "ActiveTasksPendingRevocation", "versions": "0+", "type": "[]TaskId", + { "name": "ActiveTasksPendingRevocation", "versions": "0+", "type": "[]TaskIds", "about": "The active tasks that must be revoked by this member." } ], "commonStructs": [ diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupMemberTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupMemberTest.java new file mode 100644 index 00000000000..5041aa9cf07 --- /dev/null +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupMemberTest.java @@ -0,0 +1,298 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.streams; + +import org.apache.kafka.coordinator.group.generated.StreamsGroupCurrentMemberAssignmentValue; +import org.apache.kafka.coordinator.group.generated.StreamsGroupMemberMetadataValue; +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.Optional; +import java.util.OptionalInt; + +import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkStreamsAssignment; +import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkTaskAssignment; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class StreamsGroupMemberTest { + + @Test + public void testNewMember() { + String subtopologyId1 = "subtopology-1"; + String subtopologyId2 = "subtopology-2"; + + StreamsGroupMember member = new StreamsGroupMember.Builder("member-id") + .setMemberEpoch(10) + .setPreviousMemberEpoch(9) + .setInstanceId("instance-id") + .setRackId("rack-id") + .setRebalanceTimeoutMs(5000) + .setClientId("client-id") + .setClientHost("hostname") + .setTopologyHash("topology-hash".getBytes()) + .setAssignor("assignor") + .setProcessId("process-id") + .setHostInfo(new StreamsGroupMemberMetadataValue.HostInfo().setHost("host").setPort(9090)) + .setClientTags(Arrays.asList(new StreamsGroupMemberMetadataValue.KeyValue().setKey("client").setValue("tag"))) + .setUserData("user-data".getBytes()) + .setAssignmentConfigs(Arrays.asList(new StreamsGroupMemberMetadataValue.KeyValue().setKey("assignment").setValue("config"))) + .setAssignedActiveTasks(mkStreamsAssignment(mkTaskAssignment(subtopologyId1, 1, 2, 3))) + .setAssignedStandbyTasks(mkStreamsAssignment(mkTaskAssignment(subtopologyId2, 6, 5, 4))) + .setAssignedWarmupTasks(mkStreamsAssignment(mkTaskAssignment(subtopologyId1, 7, 8, 9))) + .setActiveTasksPendingRevocation(mkStreamsAssignment(mkTaskAssignment(subtopologyId2, 3, 2, 1))) + .build(); + + assertEquals("member-id", member.memberId()); + assertEquals(10, member.memberEpoch()); + assertEquals(9, member.previousMemberEpoch()); + assertEquals("instance-id", member.instanceId()); + assertEquals("rack-id", member.rackId()); + assertEquals("client-id", member.clientId()); + assertEquals("hostname", member.clientHost()); + assertTrue(Arrays.equals("topology-hash".getBytes(), member.topologyHash())); + assertEquals("assignor", member.assignor().get()); + assertEquals("process-id", member.processId()); + assertEquals(new StreamsGroupMemberMetadataValue.HostInfo().setHost("host").setPort(9090), member.hostInfo()); + assertEquals( + Arrays.asList(new StreamsGroupMemberMetadataValue.KeyValue().setKey("client").setValue("tag")), + member.clientTags() + ); + assertTrue(Arrays.equals("user-data".getBytes(), member.userData())); + assertEquals( + Arrays.asList(new StreamsGroupMemberMetadataValue.KeyValue().setKey("assignment").setValue("config")), + member.assignmentConfigs() + ); + assertEquals( + mkStreamsAssignment(mkTaskAssignment(subtopologyId1, 1, 2, 3)), + member.assignedActiveTasks() + ); + assertEquals( + mkStreamsAssignment(mkTaskAssignment(subtopologyId2, 6, 5, 4)), + member.assignedStandbyTasks() + ); + assertEquals( + mkStreamsAssignment(mkTaskAssignment(subtopologyId1, 7, 8, 9)), + member.assignedWarmupTasks() + ); + assertEquals( + mkStreamsAssignment(mkTaskAssignment(subtopologyId2, 3, 2, 1)), + member.activeTasksPendingRevocation() + ); + } + + @Test + public void testEquals() { + String subtopologyId1 = "subtopology-1"; + String subtopologyId2 = "subtopology-2"; + + StreamsGroupMember member1 = new StreamsGroupMember.Builder("member-id") + .setMemberEpoch(10) + .setPreviousMemberEpoch(9) + .setInstanceId("instance-id") + .setRackId("rack-id") + .setRebalanceTimeoutMs(5000) + .setClientId("client-id") + .setClientHost("hostname") + .setTopologyHash("topology-hash".getBytes()) + .setAssignor("assignor") + .setProcessId("process-id") + .setHostInfo(new StreamsGroupMemberMetadataValue.HostInfo().setHost("host").setPort(9090)) + .setClientTags(Arrays.asList(new StreamsGroupMemberMetadataValue.KeyValue().setKey("client").setValue("tag"))) + .setUserData("user-data".getBytes()) + .setAssignmentConfigs(Arrays.asList(new StreamsGroupMemberMetadataValue.KeyValue().setKey("assignment").setValue("config"))) + .setAssignedActiveTasks(mkStreamsAssignment(mkTaskAssignment(subtopologyId1, 1, 2, 3))) + .setAssignedStandbyTasks(mkStreamsAssignment(mkTaskAssignment(subtopologyId2, 6, 5, 4))) + .setAssignedWarmupTasks(mkStreamsAssignment(mkTaskAssignment(subtopologyId1, 7, 8, 9))) + .setActiveTasksPendingRevocation(mkStreamsAssignment(mkTaskAssignment(subtopologyId2, 3, 2, 1))) + .build(); + + StreamsGroupMember member2 = new StreamsGroupMember.Builder("member-id") + .setMemberEpoch(10) + .setPreviousMemberEpoch(9) + .setInstanceId("instance-id") + .setRackId("rack-id") + .setRebalanceTimeoutMs(5000) + .setClientId("client-id") + .setClientHost("hostname") + .setTopologyHash("topology-hash".getBytes()) + .setAssignor("assignor") + .setProcessId("process-id") + .setHostInfo(new StreamsGroupMemberMetadataValue.HostInfo().setHost("host").setPort(9090)) + .setClientTags(Arrays.asList(new StreamsGroupMemberMetadataValue.KeyValue().setKey("client").setValue("tag"))) + .setUserData("user-data".getBytes()) + .setAssignmentConfigs(Arrays.asList(new StreamsGroupMemberMetadataValue.KeyValue().setKey("assignment").setValue("config"))) + .setAssignedActiveTasks(mkStreamsAssignment(mkTaskAssignment(subtopologyId1, 1, 2, 3))) + .setAssignedStandbyTasks(mkStreamsAssignment(mkTaskAssignment(subtopologyId2, 6, 5, 4))) + .setAssignedWarmupTasks(mkStreamsAssignment(mkTaskAssignment(subtopologyId1, 7, 8, 9))) + .setActiveTasksPendingRevocation(mkStreamsAssignment(mkTaskAssignment(subtopologyId2, 3, 2, 1))) + .build(); + + StreamsGroupMember member3 = new StreamsGroupMember.Builder("member3-id") + .setMemberEpoch(10) + .setPreviousMemberEpoch(9) + .setInstanceId("instance-id") + .setRackId("rack-id") + .setRebalanceTimeoutMs(5000) + .setClientId("client-id") + .setClientHost("hostname") + .setTopologyHash("topology-hash".getBytes()) + .setAssignor("assignor") + .setProcessId("process-id") + .setHostInfo(new StreamsGroupMemberMetadataValue.HostInfo().setHost("host").setPort(9090)) + .setClientTags(Arrays.asList(new StreamsGroupMemberMetadataValue.KeyValue().setKey("client").setValue("tag"))) + .setUserData("user-data".getBytes()) + .setAssignmentConfigs(Arrays.asList(new StreamsGroupMemberMetadataValue.KeyValue().setKey("assignment").setValue("config"))) + .setAssignedActiveTasks(mkStreamsAssignment(mkTaskAssignment(subtopologyId1, 1, 2, 3))) + .setAssignedStandbyTasks(mkStreamsAssignment(mkTaskAssignment(subtopologyId2, 6, 5, 4))) + .setAssignedWarmupTasks(mkStreamsAssignment(mkTaskAssignment(subtopologyId1, 7, 8, 9))) + .setActiveTasksPendingRevocation(mkStreamsAssignment(mkTaskAssignment(subtopologyId2, 3, 2, 1))) + .build(); + + assertEquals(member1, member2); + assertNotEquals(member1, member3); + } + + @Test + public void testUpdateMember() { + String subtopologyId1 = "subtopology-1"; + String subtopologyId2 = "subtopology-2"; + + StreamsGroupMember member = new StreamsGroupMember.Builder("member-id") + .setMemberEpoch(10) + .setPreviousMemberEpoch(9) + .setInstanceId("instance-id") + .setRackId("rack-id") + .setRebalanceTimeoutMs(5000) + .setClientId("client-id") + .setClientHost("hostname") + .setTopologyHash("topology-hash".getBytes()) + .setAssignor("assignor") + .setProcessId("process-id") + .setHostInfo(new StreamsGroupMemberMetadataValue.HostInfo().setHost("host").setPort(9090)) + .setClientTags(Arrays.asList(new StreamsGroupMemberMetadataValue.KeyValue().setKey("client").setValue("tag"))) + .setUserData("user-data".getBytes()) + .setAssignmentConfigs(Arrays.asList(new StreamsGroupMemberMetadataValue.KeyValue().setKey("assignment").setValue("config"))) + .setAssignedActiveTasks(mkStreamsAssignment(mkTaskAssignment(subtopologyId1, 1, 2, 3))) + .setAssignedStandbyTasks(mkStreamsAssignment(mkTaskAssignment(subtopologyId2, 6, 5, 4))) + .setAssignedWarmupTasks(mkStreamsAssignment(mkTaskAssignment(subtopologyId1, 7, 8, 9))) + .setActiveTasksPendingRevocation(mkStreamsAssignment(mkTaskAssignment(subtopologyId2, 3, 2, 1))) + .build(); + + // This is a no-op. + StreamsGroupMember updatedMember = new StreamsGroupMember.Builder(member) + .maybeUpdateRackId(Optional.empty()) + .maybeUpdateInstanceId(Optional.empty()) + .maybeUpdateRebalanceTimeoutMs(OptionalInt.empty()) + .maybeUpdateAssignor(Optional.empty()) + .build(); + + assertEquals(member, updatedMember); + + updatedMember = new StreamsGroupMember.Builder(member) + .maybeUpdateRackId(Optional.of("new-rack-id")) + .maybeUpdateInstanceId(Optional.of("new-instance-id")) + .maybeUpdateRebalanceTimeoutMs(OptionalInt.of(6000)) + .maybeUpdateAssignor(Optional.of("new-assignor")) + .build(); + + assertEquals("new-instance-id", updatedMember.instanceId()); + assertEquals("new-rack-id", updatedMember.rackId()); + assertEquals(6000, updatedMember.rebalanceTimeoutMs()); + assertEquals("new-assignor", updatedMember.assignor()); + } + + @Test + public void testUpdateWithStreamsGroupMemberMetadataValue() { + StreamsGroupMemberMetadataValue record = new StreamsGroupMemberMetadataValue() + .setClientId("client-id") + .setClientHost("host-id") + .setInstanceId("instance-id") + .setRackId("rack-id") + .setRebalanceTimeoutMs(1000) + .setTopologyHash("topology-hash".getBytes()) + .setAssignor("assignor") + .setProcessId("process-id") + .setHostInfo(new StreamsGroupMemberMetadataValue.HostInfo().setHost("host").setPort(9090)) + .setClientTags(Arrays.asList(new StreamsGroupMemberMetadataValue.KeyValue().setKey("client").setValue("tag"))) + .setUserData("user-data".getBytes()) + .setAssignmentConfigs(Arrays.asList(new StreamsGroupMemberMetadataValue.KeyValue().setKey("assignment").setValue("config"))); + + StreamsGroupMember member = new StreamsGroupMember.Builder("member-id") + .updateWith(record) + .build(); + + assertEquals("instance-id", member.instanceId()); + assertEquals("rack-id", member.rackId()); + assertEquals("client-id", member.clientId()); + assertEquals("host-id", member.clientHost()); + assertEquals(1000, member.rebalanceTimeoutMs()); + assertTrue(Arrays.equals("topology-hash".getBytes(), member.topologyHash())); + assertEquals("assignor", member.assignor().get()); + assertEquals("process-id", member.processId()); + assertEquals(new StreamsGroupMemberMetadataValue.HostInfo().setHost("host").setPort(9090), member.hostInfo()); + assertEquals( + Arrays.asList(new StreamsGroupMemberMetadataValue.KeyValue().setKey("client").setValue("tag")), + member.clientTags() + ); + assertTrue(Arrays.equals("user-data".getBytes(), member.userData())); + assertEquals( + Arrays.asList(new StreamsGroupMemberMetadataValue.KeyValue().setKey("assignment").setValue("config")), + member.assignmentConfigs() + ); + } + + @Test + public void testUpdateWithConsumerGroupCurrentMemberAssignmentValue() { + String subtopologyId1 = "subtopology-1"; + String subtopologyId2 = "subtopology-2"; + + StreamsGroupCurrentMemberAssignmentValue record = new StreamsGroupCurrentMemberAssignmentValue() + .setMemberEpoch(10) + .setPreviousMemberEpoch(9) + .setState((byte) 1) + .setActiveTasks(Arrays.asList(new StreamsGroupCurrentMemberAssignmentValue.TaskIds() + .setSubtopology(subtopologyId1) + .setPartitions(Arrays.asList(1, 2, 3))) + ) + .setStandbyTasks(Arrays.asList(new StreamsGroupCurrentMemberAssignmentValue.TaskIds() + .setSubtopology(subtopologyId2) + .setPartitions(Arrays.asList(6, 5, 4))) + ) + .setWarmupTasks(Arrays.asList(new StreamsGroupCurrentMemberAssignmentValue.TaskIds() + .setSubtopology(subtopologyId1) + .setPartitions(Arrays.asList(7, 8, 9))) + ) + .setActiveTasksPendingRevocation(Arrays.asList(new StreamsGroupCurrentMemberAssignmentValue.TaskIds() + .setSubtopology(subtopologyId2) + .setPartitions(Arrays.asList(2, 3, 1))) + ); + + StreamsGroupMember member = new StreamsGroupMember.Builder("member-id") + .updateWith(record) + .build(); + + assertEquals(10, member.memberEpoch()); + assertEquals(9, member.previousMemberEpoch()); + assertEquals(MemberState.STABLE, member.state()); + assertEquals(mkStreamsAssignment(mkTaskAssignment(subtopologyId1, 1, 2, 3)), member.assignedActiveTasks()); + assertEquals(mkStreamsAssignment(mkTaskAssignment(subtopologyId2, 6, 5, 4)), member.assignedStandbyTasks()); + assertEquals(mkStreamsAssignment(mkTaskAssignment(subtopologyId1, 7, 8, 9)), member.assignedWarmupTasks()); + assertEquals(mkStreamsAssignment(mkTaskAssignment(subtopologyId2, 2, 3, 1)), member.activeTasksPendingRevocation()); + } +} diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java new file mode 100644 index 00000000000..d2f2f4fd22d --- /dev/null +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java @@ -0,0 +1,730 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.streams; + +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.GroupNotEmptyException; +import org.apache.kafka.common.errors.StaleMemberEpochException; +import org.apache.kafka.common.errors.UnknownMemberIdException; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.MockTime; +import org.apache.kafka.coordinator.group.OffsetAndMetadata; +import org.apache.kafka.coordinator.group.OffsetExpirationCondition; +import org.apache.kafka.coordinator.group.OffsetExpirationConditionImpl; +import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetricsShard; +import org.apache.kafka.timeline.SnapshotRegistry; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; + +import java.util.Optional; +import java.util.OptionalInt; +import java.util.OptionalLong; + +import static java.util.Collections.emptyMap; +import static org.apache.kafka.common.utils.Utils.mkEntry; +import static org.apache.kafka.common.utils.Utils.mkMap; +import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkStreamsAssignment; +import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkTaskAssignment; +import static org.apache.kafka.server.immutable.ImmutableSet.singleton; +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.mock; + +public class StreamsGroupTest { + + private StreamsGroup createStreamsGroup(String groupId) { + SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext()); + return new StreamsGroup( + snapshotRegistry, + groupId, + mock(GroupCoordinatorMetricsShard.class) + ); + } + + @Test + public void testGetOrCreateMember() { + StreamsGroup streamsGroup = createStreamsGroup("foo"); + StreamsGroupMember member; + + // Create a member. + member = streamsGroup.getOrMaybeCreateMember("member-id", true); + assertEquals("member-id", member.memberId()); + + // Add member to the group. + streamsGroup.updateMember(member); + + // Get that member back. + member = streamsGroup.getOrMaybeCreateMember("member-id", false); + assertEquals("member-id", member.memberId()); + + assertThrows(UnknownMemberIdException.class, () -> + streamsGroup.getOrMaybeCreateMember("does-not-exist", false)); + } + + @Test + public void testUpdateMember() { + StreamsGroup streamsGroup = createStreamsGroup("foo"); + StreamsGroupMember member; + + member = streamsGroup.getOrMaybeCreateMember("member", true); + + member = new StreamsGroupMember.Builder(member) + .setAssignor("client") + .build(); + + streamsGroup.updateMember(member); + + assertEquals(member, streamsGroup.getOrMaybeCreateMember("member", false)); + } + + @Test + public void testNoStaticMember() { + StreamsGroup streamsGroup = createStreamsGroup("foo"); + + // Create a new member which is not static + streamsGroup.getOrMaybeCreateMember("member", true); + assertNull(streamsGroup.staticMember("instance-id")); + } + + @Test + public void testGetStaticMemberByInstanceId() { + StreamsGroup streamsGroup = createStreamsGroup("foo"); + StreamsGroupMember member; + + member = streamsGroup.getOrMaybeCreateMember("member", true); + + member = new StreamsGroupMember.Builder(member) + .setInstanceId("instance") + .build(); + + streamsGroup.updateMember(member); + + assertEquals(member, streamsGroup.staticMember("instance")); + assertEquals(member, streamsGroup.getOrMaybeCreateMember("member", false)); + assertEquals(member.memberId(), streamsGroup.staticMemberId("instance")); + } + + @Test + public void testRemoveMember() { + StreamsGroup streamsGroup = createStreamsGroup("foo"); + + StreamsGroupMember member = streamsGroup.getOrMaybeCreateMember("member", true); + streamsGroup.updateMember(member); + assertTrue(streamsGroup.hasMember("member")); + + streamsGroup.removeMember("member"); + assertFalse(streamsGroup.hasMember("member")); + + } + + @Test + public void testRemoveStaticMember() { + StreamsGroup streamsGroup = createStreamsGroup("foo"); + + StreamsGroupMember member = new StreamsGroupMember.Builder("member") + .setInstanceId("instance") + .build(); + + streamsGroup.updateMember(member); + assertTrue(streamsGroup.hasMember("member")); + + streamsGroup.removeMember("member"); + assertFalse(streamsGroup.hasMember("member")); + assertNull(streamsGroup.staticMember("instance")); + assertNull(streamsGroup.staticMemberId("instance")); + } + + @Test + public void testUpdatingMemberUpdatesPartitionEpoch() { + String fooSubtopology = "foo-sub"; + String barSubtopology = "bar-sub"; + String zarSubtopology = "zar-sub"; + + StreamsGroup streamsGroup = createStreamsGroup("foo"); + StreamsGroupMember member; + + member = new StreamsGroupMember.Builder("member") + .setMemberEpoch(10) + .setAssignedActiveTasks(mkStreamsAssignment( + mkTaskAssignment(fooSubtopology, 1, 2, 3))) + .setActiveTasksPendingRevocation(mkStreamsAssignment( + mkTaskAssignment(barSubtopology, 4, 5, 6))) + .build(); + + streamsGroup.updateMember(member); + + assertEquals(10, streamsGroup.currentActiveTaskEpoch(fooSubtopology, 1)); + assertEquals(10, streamsGroup.currentActiveTaskEpoch(fooSubtopology, 2)); + assertEquals(10, streamsGroup.currentActiveTaskEpoch(fooSubtopology, 3)); + assertEquals(10, streamsGroup.currentActiveTaskEpoch(barSubtopology, 4)); + assertEquals(10, streamsGroup.currentActiveTaskEpoch(barSubtopology, 5)); + assertEquals(10, streamsGroup.currentActiveTaskEpoch(barSubtopology, 6)); + assertEquals(-1, streamsGroup.currentActiveTaskEpoch(zarSubtopology, 7)); + assertEquals(-1, streamsGroup.currentActiveTaskEpoch(zarSubtopology, 8)); + assertEquals(-1, streamsGroup.currentActiveTaskEpoch(zarSubtopology, 9)); + + member = new StreamsGroupMember.Builder(member) + .setMemberEpoch(11) + .setAssignedActiveTasks(mkStreamsAssignment( + mkTaskAssignment(barSubtopology, 1, 2, 3))) + .setActiveTasksPendingRevocation(mkStreamsAssignment( + mkTaskAssignment(zarSubtopology, 4, 5, 6))) + .build(); + + streamsGroup.updateMember(member); + + assertEquals(11, streamsGroup.currentActiveTaskEpoch(barSubtopology, 1)); + assertEquals(11, streamsGroup.currentActiveTaskEpoch(barSubtopology, 2)); + assertEquals(11, streamsGroup.currentActiveTaskEpoch(barSubtopology, 3)); + assertEquals(11, streamsGroup.currentActiveTaskEpoch(zarSubtopology, 4)); + assertEquals(11, streamsGroup.currentActiveTaskEpoch(zarSubtopology, 5)); + assertEquals(11, streamsGroup.currentActiveTaskEpoch(zarSubtopology, 6)); + assertEquals(-1, streamsGroup.currentActiveTaskEpoch(barSubtopology, 7)); + assertEquals(-1, streamsGroup.currentActiveTaskEpoch(barSubtopology, 8)); + assertEquals(-1, streamsGroup.currentActiveTaskEpoch(barSubtopology, 9)); + } + + @Test + public void testUpdatingMemberUpdatesPartitionEpochWhenPartitionIsReassignedBeforeBeingRevoked() { + String fooSubtopologyId = "foo-sub"; + + StreamsGroup streamsGroup = createStreamsGroup("foo"); + StreamsGroupMember member; + + member = new StreamsGroupMember.Builder("member") + .setMemberEpoch(10) + .setAssignedActiveTasks(emptyMap()) + .setAssignedStandbyTasks(emptyMap()) + .setAssignedWarmupTasks(emptyMap()) + .setActiveTasksPendingRevocation(mkStreamsAssignment( + mkTaskAssignment(fooSubtopologyId, 1))) + .build(); + + streamsGroup.updateMember(member); + + assertEquals(10, streamsGroup.currentActiveTaskEpoch(fooSubtopologyId, 1)); + + member = new StreamsGroupMember.Builder(member) + .setMemberEpoch(11) + .setAssignedActiveTasks(mkStreamsAssignment(mkTaskAssignment(fooSubtopologyId, 1))) + .setAssignedStandbyTasks(emptyMap()) + .setAssignedWarmupTasks(emptyMap()) + .setActiveTasksPendingRevocation(emptyMap()) + .build(); + + streamsGroup.updateMember(member); + + assertEquals(11, streamsGroup.currentActiveTaskEpoch(fooSubtopologyId, 1)); + } + + @Test + public void testUpdatingMemberUpdatesPartitionEpochWhenPartitionIsNotReleased() { + String fooSubtopologyId = "foo-sub"; + StreamsGroup streamsGroup = createStreamsGroup("foo"); + + StreamsGroupMember m1 = new StreamsGroupMember.Builder("m1") + .setMemberEpoch(10) + .setAssignedActiveTasks(mkStreamsAssignment(mkTaskAssignment(fooSubtopologyId, 1))) + .setAssignedStandbyTasks(emptyMap()) + .setAssignedWarmupTasks(emptyMap()) + .build(); + + streamsGroup.updateMember(m1); + + StreamsGroupMember m2 = new StreamsGroupMember.Builder("m2") + .setMemberEpoch(10) + .setAssignedActiveTasks(mkStreamsAssignment(mkTaskAssignment(fooSubtopologyId, 1))) + .setAssignedStandbyTasks(emptyMap()) + .setAssignedWarmupTasks(emptyMap()) + .build(); + + // m2 should not be able to acquire foo-1 because the partition is + // still owned by another member. + assertThrows(IllegalStateException.class, () -> streamsGroup.updateMember(m2)); + } + + @Test + public void testRemoveTaskEpochs() { + String fooSubtopologyId = "foo-sub"; + StreamsGroup streamsGroup = createStreamsGroup("foo"); + + // Removing should fail because there is no epoch set. + assertThrows(IllegalStateException.class, () -> streamsGroup.removeActiveTaskEpochs( + mkStreamsAssignment(mkTaskAssignment(fooSubtopologyId, 1)), + 10 + )); + + StreamsGroupMember m1 = new StreamsGroupMember.Builder("m1") + .setMemberEpoch(10) + .setAssignedActiveTasks(mkStreamsAssignment(mkTaskAssignment(fooSubtopologyId, 1))) + .build(); + + streamsGroup.updateMember(m1); + + // Removing should fail because the expected epoch is incorrect. + assertThrows(IllegalStateException.class, () -> streamsGroup.removeActiveTaskEpochs( + mkStreamsAssignment(mkTaskAssignment(fooSubtopologyId, 1)), + 11 + )); + } + + @Test + public void testAddPartitionEpochs() { + String fooSubtopologyId = "foo-sub"; + StreamsGroup streamsGroup = createStreamsGroup("foo"); + + streamsGroup.addTaskEpochs( + mkStreamsAssignment(mkTaskAssignment(fooSubtopologyId, 1)), + emptyMap(), + emptyMap(), + 10 + ); + + // Changing the epoch should fail because the owner of the partition + // should remove it first. + assertThrows(IllegalStateException.class, () -> streamsGroup.addTaskEpochs( + mkStreamsAssignment(mkTaskAssignment(fooSubtopologyId, 1)), + emptyMap(), + emptyMap(), + 11 + )); + } + + @Test + public void testDeletingMemberRemovesPartitionEpoch() { + String fooSubtopology = "foo-sub"; + String barSubtopology = "bar-sub"; + String zarSubtopology = "zar-sub"; + + StreamsGroup streamsGroup = createStreamsGroup("foo"); + StreamsGroupMember member; + + member = new StreamsGroupMember.Builder("member") + .setMemberEpoch(10) + .setAssignedActiveTasks(mkStreamsAssignment( + mkTaskAssignment(fooSubtopology, 1, 2, 3))) + .setActiveTasksPendingRevocation(mkStreamsAssignment( + mkTaskAssignment(barSubtopology, 4, 5, 6))) + .build(); + + streamsGroup.updateMember(member); + + assertEquals(10, streamsGroup.currentActiveTaskEpoch(fooSubtopology, 1)); + assertEquals(10, streamsGroup.currentActiveTaskEpoch(fooSubtopology, 2)); + assertEquals(10, streamsGroup.currentActiveTaskEpoch(fooSubtopology, 3)); + assertEquals(10, streamsGroup.currentActiveTaskEpoch(barSubtopology, 4)); + assertEquals(10, streamsGroup.currentActiveTaskEpoch(barSubtopology, 5)); + assertEquals(10, streamsGroup.currentActiveTaskEpoch(barSubtopology, 6)); + assertEquals(-1, streamsGroup.currentActiveTaskEpoch(zarSubtopology, 7)); + assertEquals(-1, streamsGroup.currentActiveTaskEpoch(zarSubtopology, 8)); + assertEquals(-1, streamsGroup.currentActiveTaskEpoch(zarSubtopology, 9)); + + streamsGroup.removeMember(member.memberId()); + + assertEquals(-1, streamsGroup.currentActiveTaskEpoch(barSubtopology, 1)); + assertEquals(-1, streamsGroup.currentActiveTaskEpoch(barSubtopology, 2)); + assertEquals(-1, streamsGroup.currentActiveTaskEpoch(barSubtopology, 3)); + assertEquals(-1, streamsGroup.currentActiveTaskEpoch(zarSubtopology, 4)); + assertEquals(-1, streamsGroup.currentActiveTaskEpoch(zarSubtopology, 5)); + assertEquals(-1, streamsGroup.currentActiveTaskEpoch(zarSubtopology, 6)); + assertEquals(-1, streamsGroup.currentActiveTaskEpoch(fooSubtopology, 7)); + assertEquals(-1, streamsGroup.currentActiveTaskEpoch(fooSubtopology, 8)); + assertEquals(-1, streamsGroup.currentActiveTaskEpoch(fooSubtopology, 9)); + } + + @Test + public void testWaitingOnUnreleasedTask() { + String fooSubtopology = "foo-sub"; + String barSubtopology = "bar-sub"; + String zarSubtopology = "zar-sub"; + String memberId1 = "m1"; + String memberId2 = "m2"; + + StreamsGroup streamsGroup = createStreamsGroup("foo"); + streamsGroup.updateTargetAssignment(memberId1, + new Assignment( + mkStreamsAssignment( + mkTaskAssignment(fooSubtopology, 1, 2, 3), + mkTaskAssignment(zarSubtopology, 7, 8, 9) + ), + emptyMap(), + emptyMap()) + ); + + StreamsGroupMember member1 = new StreamsGroupMember.Builder(memberId1) + .setMemberEpoch(10) + .setState(MemberState.UNRELEASED_TASKS) + .setAssignedActiveTasks(mkStreamsAssignment( + mkTaskAssignment(fooSubtopology, 1, 2, 3) + )) + .setActiveTasksPendingRevocation(mkStreamsAssignment( + mkTaskAssignment(barSubtopology, 4, 5, 6) + )) + .build(); + streamsGroup.updateMember(member1); + + assertFalse(streamsGroup.waitingOnUnreleasedActiveTasks(member1)); + + StreamsGroupMember member2 = new StreamsGroupMember.Builder(memberId2) + .setMemberEpoch(10) + .setActiveTasksPendingRevocation(mkStreamsAssignment( + mkTaskAssignment(zarSubtopology, 7))) + .build(); + streamsGroup.updateMember(member2); + + assertTrue(streamsGroup.waitingOnUnreleasedActiveTasks(member1)); + } + + @Test + public void testGroupState() { + StreamsGroup streamsGroup = createStreamsGroup("foo"); + assertEquals(StreamsGroup.StreamsGroupState.EMPTY, streamsGroup.state()); + + StreamsGroupMember member1 = new StreamsGroupMember.Builder("member1") + .setState(MemberState.STABLE) + .setMemberEpoch(1) + .setPreviousMemberEpoch(0) + .build(); + + streamsGroup.updateMember(member1); + streamsGroup.setGroupEpoch(1); + + assertEquals(MemberState.STABLE, member1.state()); + assertEquals(StreamsGroup.StreamsGroupState.ASSIGNING, streamsGroup.state()); + + StreamsGroupMember member2 = new StreamsGroupMember.Builder("member2") + .setState(MemberState.STABLE) + .setMemberEpoch(1) + .setPreviousMemberEpoch(0) + .build(); + + streamsGroup.updateMember(member2); + streamsGroup.setGroupEpoch(2); + + assertEquals(MemberState.STABLE, member2.state()); + assertEquals(StreamsGroup.StreamsGroupState.ASSIGNING, streamsGroup.state()); + + streamsGroup.setTargetAssignmentEpoch(2); + + assertEquals(StreamsGroup.StreamsGroupState.RECONCILING, streamsGroup.state()); + + member1 = new StreamsGroupMember.Builder(member1) + .setState(MemberState.STABLE) + .setMemberEpoch(2) + .setPreviousMemberEpoch(1) + .build(); + + streamsGroup.updateMember(member1); + + assertEquals(MemberState.STABLE, member1.state()); + assertEquals(StreamsGroup.StreamsGroupState.RECONCILING, streamsGroup.state()); + + // Member 2 is not stable so the group stays in reconciling state. + member2 = new StreamsGroupMember.Builder(member2) + .setState(MemberState.UNREVOKED_TASKS) + .setMemberEpoch(2) + .setPreviousMemberEpoch(1) + .build(); + + streamsGroup.updateMember(member2); + + assertEquals(MemberState.UNREVOKED_TASKS, member2.state()); + assertEquals(StreamsGroup.StreamsGroupState.RECONCILING, streamsGroup.state()); + + member2 = new StreamsGroupMember.Builder(member2) + .setState(MemberState.STABLE) + .setMemberEpoch(2) + .setPreviousMemberEpoch(1) + .build(); + + streamsGroup.updateMember(member2); + + assertEquals(MemberState.STABLE, member2.state()); + assertEquals(StreamsGroup.StreamsGroupState.STABLE, streamsGroup.state()); + + streamsGroup.removeMember("member1"); + streamsGroup.removeMember("member2"); + + assertEquals(StreamsGroup.StreamsGroupState.EMPTY, streamsGroup.state()); + } + + @Test + public void testUpdateInvertedAssignment() { + SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext()); + GroupCoordinatorMetricsShard metricsShard = mock(GroupCoordinatorMetricsShard.class); + StreamsGroup streamsGroup = new StreamsGroup(snapshotRegistry, "test-group", metricsShard); + String subtopologyId = "foo-sub"; + String memberId1 = "member1"; + String memberId2 = "member2"; + + // Initial assignment for member1 + Assignment initialAssignment = new Assignment( + mkStreamsAssignment(mkTaskAssignment(subtopologyId, 0)), + emptyMap(), + emptyMap() + ); + streamsGroup.updateTargetAssignment(memberId1, initialAssignment); + + // Verify that partition 0 is assigned to member1. + assertEquals( + mkMap( + mkEntry(subtopologyId, mkMap(mkEntry(0, memberId1))) + ), + streamsGroup.invertedTargetActiveTasksAssignment() + ); + + // New assignment for member1 + Assignment newAssignment = new Assignment( + mkStreamsAssignment(mkTaskAssignment(subtopologyId, 1)), + emptyMap(), + emptyMap() + ); + streamsGroup.updateTargetAssignment(memberId1, newAssignment); + + // Verify that partition 0 is no longer assigned and partition 1 is assigned to member1 + assertEquals( + mkMap( + mkEntry(subtopologyId, mkMap(mkEntry(1, memberId1))) + ), + streamsGroup.invertedTargetActiveTasksAssignment() + ); + + // New assignment for member2 to add partition 1 + Assignment newAssignment2 = new Assignment( + mkStreamsAssignment(mkTaskAssignment(subtopologyId, 1)), + emptyMap(), + emptyMap() + ); + streamsGroup.updateTargetAssignment(memberId2, newAssignment); + + // Verify that partition 1 is assigned to member2 + assertEquals( + mkMap( + mkEntry(subtopologyId, mkMap(mkEntry(1, memberId2))) + ), + streamsGroup.invertedTargetActiveTasksAssignment() + ); + + // New assignment for member1 to revoke partition 1 and assign partition 0 + Assignment newAssignment1 = new Assignment( + mkStreamsAssignment(mkTaskAssignment(subtopologyId, 0)), + emptyMap(), + emptyMap() + ); + streamsGroup.updateTargetAssignment(memberId1, newAssignment1); + + // Verify that partition 1 is still assigned to member2 and partition 0 is assigned to member1 + assertEquals( + mkMap( + mkEntry(subtopologyId, mkMap( + mkEntry(0, memberId1), + mkEntry(1, memberId2) + )) + ), + streamsGroup.invertedTargetActiveTasksAssignment() + ); + + // Test remove target assignment for member1 + streamsGroup.removeTargetAssignment(memberId1); + + // Verify that partition 0 is no longer assigned and partition 1 is still assigned to member2 + assertEquals( + mkMap( + mkEntry(subtopologyId, mkMap(mkEntry(1, memberId2))) + ), + streamsGroup.invertedTargetActiveTasksAssignment() + ); + } + + @Test + public void testMetadataRefreshDeadline() { + MockTime time = new MockTime(); + StreamsGroup group = createStreamsGroup("group-foo"); + + // Group epoch starts at 0. + assertEquals(0, group.groupEpoch()); + + // The refresh time deadline should be empty when the group is created or loaded. + assertTrue(group.hasMetadataExpired(time.milliseconds())); + assertEquals(0L, group.metadataRefreshDeadline().deadlineMs); + assertEquals(0, group.metadataRefreshDeadline().epoch); + + // Set the refresh deadline. The metadata remains valid because the deadline + // has not past and the group epoch is correct. + group.setMetadataRefreshDeadline(time.milliseconds() + 1000, group.groupEpoch()); + assertFalse(group.hasMetadataExpired(time.milliseconds())); + assertEquals(time.milliseconds() + 1000, group.metadataRefreshDeadline().deadlineMs); + assertEquals(group.groupEpoch(), group.metadataRefreshDeadline().epoch); + + // Advance past the deadline. The metadata should have expired. + time.sleep(1001L); + assertTrue(group.hasMetadataExpired(time.milliseconds())); + + // Set the refresh time deadline with a higher group epoch. The metadata is considered + // as expired because the group epoch attached to the deadline is higher than the + // current group epoch. + group.setMetadataRefreshDeadline(time.milliseconds() + 1000, group.groupEpoch() + 1); + assertTrue(group.hasMetadataExpired(time.milliseconds())); + assertEquals(time.milliseconds() + 1000, group.metadataRefreshDeadline().deadlineMs); + assertEquals(group.groupEpoch() + 1, group.metadataRefreshDeadline().epoch); + + // Advance the group epoch. + group.setGroupEpoch(group.groupEpoch() + 1); + + // Set the refresh deadline. The metadata remains valid because the deadline + // has not past and the group epoch is correct. + group.setMetadataRefreshDeadline(time.milliseconds() + 1000, group.groupEpoch()); + assertFalse(group.hasMetadataExpired(time.milliseconds())); + assertEquals(time.milliseconds() + 1000, group.metadataRefreshDeadline().deadlineMs); + assertEquals(group.groupEpoch(), group.metadataRefreshDeadline().epoch); + + // Request metadata refresh. The metadata expires immediately. + group.requestMetadataRefresh(); + assertTrue(group.hasMetadataExpired(time.milliseconds())); + assertEquals(0L, group.metadataRefreshDeadline().deadlineMs); + assertEquals(0, group.metadataRefreshDeadline().epoch); + } + + @ParameterizedTest + @ValueSource(booleans = {false, true}) + public void testValidateOffsetCommit(boolean isTransactional) { + StreamsGroup group = createStreamsGroup("group-foo"); + + // Simulate a call from the admin client without member id and member epoch. + // This should pass only if the group is empty. + group.validateOffsetCommit("", "", -1, isTransactional); + + // The member does not exist. + assertThrows(UnknownMemberIdException.class, () -> + group.validateOffsetCommit("member-id", null, 0, isTransactional)); + + // Create a member. + group.updateMember(new StreamsGroupMember.Builder("member-id").build()); + + // A call from the admin client should fail as the group is not empty. + assertThrows(UnknownMemberIdException.class, () -> + group.validateOffsetCommit("", "", -1, isTransactional)); + + // The member epoch is stale. + assertThrows(StaleMemberEpochException.class, () -> + group.validateOffsetCommit("member-id", "", 10, isTransactional)); + + // This should succeed. + group.validateOffsetCommit("member-id", "", 0, isTransactional); + } + + @Test + public void testValidateOffsetFetch() { + SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext()); + StreamsGroup group = new StreamsGroup( + snapshotRegistry, + "group-foo", + mock(GroupCoordinatorMetricsShard.class) + ); + + // Simulate a call from the admin client without member id and member epoch. + group.validateOffsetFetch(null, -1, Long.MAX_VALUE); + + // The member does not exist. + assertThrows(UnknownMemberIdException.class, () -> + group.validateOffsetFetch("member-id", 0, Long.MAX_VALUE)); + + // Create a member. + snapshotRegistry.getOrCreateSnapshot(0); + group.updateMember(new StreamsGroupMember.Builder("member-id").build()); + + // The member does not exist at last committed offset 0. + assertThrows(UnknownMemberIdException.class, () -> + group.validateOffsetFetch("member-id", 0, 0)); + + // The member exists but the epoch is stale when the last committed offset is not considered. + assertThrows(StaleMemberEpochException.class, () -> + group.validateOffsetFetch("member-id", 10, Long.MAX_VALUE)); + + // This should succeed. + group.validateOffsetFetch("member-id", 0, Long.MAX_VALUE); + } + + @Test + public void testValidateDeleteGroup() { + StreamsGroup streamsGroup = createStreamsGroup("foo"); + + assertEquals(StreamsGroup.StreamsGroupState.EMPTY, streamsGroup.state()); + assertDoesNotThrow(streamsGroup::validateDeleteGroup); + + StreamsGroupMember member1 = new StreamsGroupMember.Builder("member1") + .setMemberEpoch(1) + .setPreviousMemberEpoch(0) + .build(); + streamsGroup.updateMember(member1); + + assertEquals(StreamsGroup.StreamsGroupState.RECONCILING, streamsGroup.state()); + assertThrows(GroupNotEmptyException.class, streamsGroup::validateDeleteGroup); + + streamsGroup.setGroupEpoch(1); + + assertEquals(StreamsGroup.StreamsGroupState.ASSIGNING, streamsGroup.state()); + assertThrows(GroupNotEmptyException.class, streamsGroup::validateDeleteGroup); + + streamsGroup.setTargetAssignmentEpoch(1); + + assertEquals(StreamsGroup.StreamsGroupState.STABLE, streamsGroup.state()); + assertThrows(GroupNotEmptyException.class, streamsGroup::validateDeleteGroup); + } + + @Test + public void testOffsetExpirationCondition() { + long currentTimestamp = 30000L; + long commitTimestamp = 20000L; + long offsetsRetentionMs = 10000L; + OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(15000L, OptionalInt.empty(), "", commitTimestamp, OptionalLong.empty()); + StreamsGroup group = new StreamsGroup(new SnapshotRegistry(new LogContext()), "group-id", mock(GroupCoordinatorMetricsShard.class)); + + Optional<OffsetExpirationCondition> offsetExpirationCondition = group.offsetExpirationCondition(); + assertTrue(offsetExpirationCondition.isPresent()); + + OffsetExpirationConditionImpl condition = (OffsetExpirationConditionImpl) offsetExpirationCondition.get(); + assertEquals(commitTimestamp, condition.baseTimestamp().apply(offsetAndMetadata)); + assertTrue(condition.isOffsetExpired(offsetAndMetadata, currentTimestamp, offsetsRetentionMs)); + } + + @Test + public void testIsInStatesCaseInsensitive() { + SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext()); + GroupCoordinatorMetricsShard metricsShard = new GroupCoordinatorMetricsShard( + snapshotRegistry, + emptyMap(), + new TopicPartition("__consumer_offsets", 0) + ); + StreamsGroup group = new StreamsGroup(snapshotRegistry, "group-foo", metricsShard); + snapshotRegistry.getOrCreateSnapshot(0); + assertTrue(group.isInStates(singleton("empty"), 0)); + assertFalse(group.isInStates(singleton("Empty"), 0)); + + group.updateMember(new StreamsGroupMember.Builder("member1") + .build()); + snapshotRegistry.getOrCreateSnapshot(1); + assertTrue(group.isInStates(singleton("empty"), 0)); + assertTrue(group.isInStates(singleton("stable"), 1)); + assertFalse(group.isInStates(singleton("empty"), 1)); + } +} \ No newline at end of file
