This is an automated email from the ASF dual-hosted git repository.
cadonna pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 624dd458099 KAFKA-18321: Add StreamsGroupMember, MemberState and
Assignment classes (#18276)
624dd458099 is described below
commit 624dd458099fa93b3fa1e1715b58bbc6d8689857
Author: Bruno Cadonna <[email protected]>
AuthorDate: Wed Jan 8 17:26:41 2025 +0100
KAFKA-18321: Add StreamsGroupMember, MemberState and Assignment classes
(#18276)
* KAFKA-18321: Add StreamsGroupMember, MemberState and Assignment classes
This commit adds the classes to represent a Streams group member in the
consumer coordinator.
Reviewers: Bill Bejeck <[email protected]>, Lucas Brutschy
<[email protected]>
---
.../coordinator/group/streams/Assignment.java | 88 ++++
.../coordinator/group/streams/MemberState.java | 74 ++++
.../group/streams/StreamsGroupMember.java | 463 +++++++++++++++++++++
.../coordinator/group/streams/AssignmentTest.java | 122 ++++++
.../group/streams/StreamsGroupMemberTest.java | 429 +++++++++++++++++++
.../group/streams/TaskAssignmentTestUtil.java | 57 +++
6 files changed, 1233 insertions(+)
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/Assignment.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/Assignment.java
new file mode 100644
index 00000000000..da377d19ccd
--- /dev/null
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/Assignment.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.streams;
+
+import
org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMemberValue;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * An immutable assignment for a member.
+ *
+ * @param activeTasks Active tasks assigned to the member.
+ * The key of the map is the subtopology ID and
the value is the set of partition IDs.
+ * @param standbyTasks Standby tasks assigned to the member.
+ * The key of the map is the subtopology ID and
the value is the set of partition IDs.
+ * @param warmupTasks Warm-up tasks assigned to the member.
+ * The key of the map is the subtopology ID and
the value is the set of partition IDs.
+ */
+public record Assignment(Map<String, Set<Integer>> activeTasks,
+ Map<String, Set<Integer>> standbyTasks,
+ Map<String, Set<Integer>> warmupTasks) {
+
+ public Assignment {
+ activeTasks =
Collections.unmodifiableMap(Objects.requireNonNull(activeTasks));
+ standbyTasks =
Collections.unmodifiableMap(Objects.requireNonNull(standbyTasks));
+ warmupTasks =
Collections.unmodifiableMap(Objects.requireNonNull(warmupTasks));
+ }
+
+ /**
+ * An empty assignment.
+ */
+ public static final Assignment EMPTY = new Assignment(
+ Collections.emptyMap(),
+ Collections.emptyMap(),
+ Collections.emptyMap()
+ );
+
+ /**
+ * Creates a {{@link
org.apache.kafka.coordinator.group.streams.Assignment}} from a
+ * {{@link
org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMemberValue}}.
+ *
+ * @param record The record.
+ * @return A {{@link
org.apache.kafka.coordinator.group.streams.Assignment}}.
+ */
+ public static Assignment fromRecord(
+ StreamsGroupTargetAssignmentMemberValue record
+ ) {
+ return new Assignment(
+ record.activeTasks().stream()
+ .collect(Collectors.toMap(
+
StreamsGroupTargetAssignmentMemberValue.TaskIds::subtopologyId,
+ taskId -> new HashSet<>(taskId.partitions())
+ )
+ ),
+ record.standbyTasks().stream()
+ .collect(Collectors.toMap(
+
StreamsGroupTargetAssignmentMemberValue.TaskIds::subtopologyId,
+ taskId -> new HashSet<>(taskId.partitions())
+ )
+ ),
+ record.warmupTasks().stream()
+ .collect(Collectors.toMap(
+
StreamsGroupTargetAssignmentMemberValue.TaskIds::subtopologyId,
+ taskId -> new HashSet<>(taskId.partitions())
+ )
+ )
+ );
+ }
+}
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/MemberState.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/MemberState.java
new file mode 100644
index 00000000000..71914da48b2
--- /dev/null
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/MemberState.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.streams;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * The various states that a member can be in. For their definition, refer to
the documentation of
+ * {{@link
org.apache.kafka.coordinator.group.streams.CurrentAssignmentBuilder}}.
+ */
+public enum MemberState {
+
+ /**
+ * The member is fully reconciled with the desired target assignment.
+ */
+ STABLE((byte) 1),
+
+ /**
+ * The member must revoke some tasks in order to be able to transition to
the next epoch.
+ */
+ UNREVOKED_TASKS((byte) 2),
+
+ /**
+ * The member transitioned to the last epoch but waits on some tasks which
have not been revoked by their previous owners yet.
+ */
+ UNRELEASED_TASKS((byte) 3),
+
+ /**
+ * The member is in an unknown state. This can only happen if a future
version of the software introduces a new state unknown by this
+ * version.
+ */
+ UNKNOWN((byte) 127);
+
+ private static final Map<Byte, MemberState> VALUES_TO_ENUMS = new
HashMap<>();
+
+ static {
+ for (MemberState state : MemberState.values()) {
+ VALUES_TO_ENUMS.put(state.value(), state);
+ }
+ }
+
+ private final byte value;
+
+ MemberState(byte value) {
+ this.value = value;
+ }
+
+ public byte value() {
+ return value;
+ }
+
+ public static MemberState fromValue(byte value) {
+ MemberState state = VALUES_TO_ENUMS.get(value);
+ if (state == null) {
+ return UNKNOWN;
+ }
+ return state;
+ }
+}
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroupMember.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroupMember.java
new file mode 100644
index 00000000000..e23df3f5701
--- /dev/null
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroupMember.java
@@ -0,0 +1,463 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.streams;
+
+import org.apache.kafka.common.message.StreamsGroupDescribeResponseData;
+import
org.apache.kafka.coordinator.group.generated.StreamsGroupCurrentMemberAssignmentValue;
+import
org.apache.kafka.coordinator.group.generated.StreamsGroupMemberMetadataValue;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * Contains all information related to a member within a Streams group.
+ * <p>
+ * This class is immutable and is fully backed by records stored in the
__consumer_offsets topic.
+ *
+ * @param memberId The ID of the member.
+ * @param memberEpoch The current epoch of the member.
+ * @param previousMemberEpoch The previous epoch of the member.
+ * @param state The current state of the member.
+ * @param instanceId The instance ID of the member.
+ * @param rackId The rack ID of the member.
+ * @param clientId The client ID of the member.
+ * @param clientHost The host of the member.
+ * @param rebalanceTimeoutMs The rebalance timeout in milliseconds.
+ * @param topologyEpoch The epoch of the topology the member
uses.
+ * @param processId The ID of the Streams client that
contains the member.
+ * @param userEndpoint The user endpoint exposed for
Interactive Queries by the Streams client that
+ * contains the member.
+ * @param clientTags Tags of the client of the member used
for rack-aware assignment.
+ * @param assignedActiveTasks Active tasks assigned to the member.
+ * The key of the map is the subtopology
ID and the value is the set of partition IDs.
+ * @param assignedStandbyTasks Standby tasks assigned to the member.
+ * The key of the map is the subtopology
ID and the value is the set of partition IDs.
+ * @param assignedWarmupTasks Warm-up tasks assigned to the member.
+ * The key of the map is the subtopology
ID and the value is the set of partition IDs.
+ * @param activeTasksPendingRevocation Active tasks assigned to the member
pending revocation.
+ * The key of the map is the subtopology
ID and the value is the set of partition IDs.
+ * @param standbyTasksPendingRevocation Standby tasks assigned to the member
pending revocation.
+ * The key of the map is the subtopology
ID and the value is the set of partition IDs.
+ * @param warmupTasksPendingRevocation Warm-up tasks assigned to the member
pending revocation.
+ * The key of the map is the subtopology
ID and the value is the set of partition IDs.
+ */
+@SuppressWarnings("checkstyle:JavaNCSS")
+public record StreamsGroupMember(String memberId,
+ Integer memberEpoch,
+ Integer previousMemberEpoch,
+ MemberState state,
+ Optional<String> instanceId,
+ Optional<String> rackId,
+ String clientId,
+ String clientHost,
+ Integer rebalanceTimeoutMs,
+ Integer topologyEpoch,
+ String processId,
+
Optional<StreamsGroupMemberMetadataValue.Endpoint> userEndpoint,
+ Map<String, String> clientTags,
+ Map<String, Set<Integer>> assignedActiveTasks,
+ Map<String, Set<Integer>>
assignedStandbyTasks,
+ Map<String, Set<Integer>> assignedWarmupTasks,
+ Map<String, Set<Integer>>
activeTasksPendingRevocation,
+ Map<String, Set<Integer>>
standbyTasksPendingRevocation,
+ Map<String, Set<Integer>>
warmupTasksPendingRevocation) {
+
+ public StreamsGroupMember {
+ Objects.requireNonNull(memberId, "memberId cannot be null");
+ clientTags = clientTags != null ?
Collections.unmodifiableMap(clientTags) : null;
+ assignedActiveTasks = assignedActiveTasks != null ?
Collections.unmodifiableMap(assignedActiveTasks) : null;
+ assignedStandbyTasks = assignedStandbyTasks != null ?
Collections.unmodifiableMap(assignedStandbyTasks) : null;
+ assignedWarmupTasks = assignedWarmupTasks != null ?
Collections.unmodifiableMap(assignedWarmupTasks) : null;
+ activeTasksPendingRevocation = activeTasksPendingRevocation != null ?
Collections.unmodifiableMap(activeTasksPendingRevocation) : null;
+ standbyTasksPendingRevocation = standbyTasksPendingRevocation != null
? Collections.unmodifiableMap(standbyTasksPendingRevocation) : null;
+ warmupTasksPendingRevocation = warmupTasksPendingRevocation != null ?
Collections.unmodifiableMap(warmupTasksPendingRevocation) : null;
+ }
+
+ /**
+ * A builder that facilitates the creation of a new member or the update
of an existing one.
+ * <p>
+ * Please refer to the javadoc of {{@link StreamsGroupMember}} for the
definition of the fields.
+ */
+ public static class Builder {
+
+ private final String memberId;
+ private Integer memberEpoch = null;
+ private Integer previousMemberEpoch = null;
+ private MemberState state = null;
+ private Optional<String> instanceId = null;
+ private Optional<String> rackId = null;
+ private Integer rebalanceTimeoutMs = null;
+ private String clientId = null;
+ private String clientHost = null;
+ private Integer topologyEpoch = null;
+ private String processId = null;
+ private Optional<StreamsGroupMemberMetadataValue.Endpoint>
userEndpoint = null;
+ private Map<String, String> clientTags = null;
+ private Map<String, Set<Integer>> assignedActiveTasks = null;
+ private Map<String, Set<Integer>> assignedStandbyTasks = null;
+ private Map<String, Set<Integer>> assignedWarmupTasks = null;
+ private Map<String, Set<Integer>> activeTasksPendingRevocation = null;
+ private Map<String, Set<Integer>> standbyTasksPendingRevocation = null;
+ private Map<String, Set<Integer>> warmupTasksPendingRevocation = null;
+
+ public Builder(String memberId) {
+ this.memberId = Objects.requireNonNull(memberId, "memberId cannot
be null");
+ }
+
+ public Builder(StreamsGroupMember member) {
+ Objects.requireNonNull(member, "member cannot be null");
+
+ this.memberId = member.memberId;
+ this.memberEpoch = member.memberEpoch;
+ this.previousMemberEpoch = member.previousMemberEpoch;
+ this.instanceId = member.instanceId;
+ this.rackId = member.rackId;
+ this.rebalanceTimeoutMs = member.rebalanceTimeoutMs;
+ this.clientId = member.clientId;
+ this.clientHost = member.clientHost;
+ this.topologyEpoch = member.topologyEpoch;
+ this.processId = member.processId;
+ this.userEndpoint = member.userEndpoint;
+ this.clientTags = member.clientTags;
+ this.state = member.state;
+ this.assignedActiveTasks = member.assignedActiveTasks;
+ this.assignedStandbyTasks = member.assignedStandbyTasks;
+ this.assignedWarmupTasks = member.assignedWarmupTasks;
+ this.activeTasksPendingRevocation =
member.activeTasksPendingRevocation;
+ this.standbyTasksPendingRevocation =
member.standbyTasksPendingRevocation;
+ this.warmupTasksPendingRevocation =
member.warmupTasksPendingRevocation;
+ }
+
+ public Builder updateMemberEpoch(int memberEpoch) {
+ int currentMemberEpoch = this.memberEpoch;
+ this.memberEpoch = memberEpoch;
+ this.previousMemberEpoch = currentMemberEpoch;
+ return this;
+ }
+
+ public Builder setMemberEpoch(int memberEpoch) {
+ this.memberEpoch = memberEpoch;
+ return this;
+ }
+
+ public Builder setPreviousMemberEpoch(int previousMemberEpoch) {
+ this.previousMemberEpoch = previousMemberEpoch;
+ return this;
+ }
+
+ public Builder setInstanceId(String instanceId) {
+ this.instanceId = Optional.ofNullable(instanceId);
+ return this;
+ }
+
+ public Builder maybeUpdateInstanceId(Optional<String> instanceId) {
+ instanceId.ifPresent(this::setInstanceId);
+ return this;
+ }
+
+ public Builder setRackId(String rackId) {
+ this.rackId = Optional.ofNullable(rackId);
+ return this;
+ }
+
+ public Builder maybeUpdateRackId(Optional<String> rackId) {
+ rackId.ifPresent(this::setRackId);
+ return this;
+ }
+
+ public Builder setRebalanceTimeoutMs(int rebalanceTimeoutMs) {
+ this.rebalanceTimeoutMs = rebalanceTimeoutMs;
+ return this;
+ }
+
+ public Builder maybeUpdateRebalanceTimeoutMs(OptionalInt
rebalanceTimeoutMs) {
+ this.rebalanceTimeoutMs =
rebalanceTimeoutMs.orElse(this.rebalanceTimeoutMs);
+ return this;
+ }
+
+ public Builder setClientId(String clientId) {
+ this.clientId = clientId;
+ return this;
+ }
+
+ public Builder setClientHost(String clientHost) {
+ this.clientHost = clientHost;
+ return this;
+ }
+
+ public Builder setState(MemberState state) {
+ this.state = state;
+ return this;
+ }
+
+ public Builder setTopologyEpoch(int topologyEpoch) {
+ this.topologyEpoch = topologyEpoch;
+ return this;
+ }
+
+ public Builder maybeUpdateTopologyEpoch(OptionalInt topologyEpoch) {
+ this.topologyEpoch = topologyEpoch.orElse(this.topologyEpoch);
+ return this;
+ }
+
+ public Builder setProcessId(String processId) {
+ this.processId = processId;
+ return this;
+ }
+
+ public Builder maybeUpdateProcessId(Optional<String> processId) {
+ this.processId = processId.orElse(this.processId);
+ return this;
+ }
+
+ public Builder
setUserEndpoint(StreamsGroupMemberMetadataValue.Endpoint userEndpoint) {
+ this.userEndpoint = Optional.ofNullable(userEndpoint);
+ return this;
+ }
+
+ public Builder
maybeUpdateUserEndpoint(Optional<StreamsGroupMemberMetadataValue.Endpoint>
userEndpoint) {
+ userEndpoint.ifPresent(this::setUserEndpoint);
+ return this;
+ }
+
+ public Builder setClientTags(Map<String, String> clientTags) {
+ this.clientTags = clientTags;
+ return this;
+ }
+
+ public Builder maybeUpdateClientTags(Optional<Map<String, String>>
clientTags) {
+ this.clientTags = clientTags.orElse(this.clientTags);
+ return this;
+ }
+
+ public Builder setAssignment(Assignment assignment) {
+ this.assignedActiveTasks = assignment.activeTasks();
+ this.assignedStandbyTasks = assignment.standbyTasks();
+ this.assignedWarmupTasks = assignment.warmupTasks();
+ return this;
+ }
+
+ public Builder setAssignedActiveTasks(Map<String, Set<Integer>>
assignedActiveTasks) {
+ this.assignedActiveTasks = assignedActiveTasks;
+ return this;
+ }
+
+ public Builder setAssignedStandbyTasks(Map<String, Set<Integer>>
assignedStandbyTasks) {
+ this.assignedStandbyTasks = assignedStandbyTasks;
+ return this;
+ }
+
+ public Builder setAssignedWarmupTasks(Map<String, Set<Integer>>
assignedWarmupTasks) {
+ this.assignedWarmupTasks = assignedWarmupTasks;
+ return this;
+ }
+
+ public Builder setAssignmentPendingRevocation(Assignment assignment) {
+ this.activeTasksPendingRevocation = assignment.activeTasks();
+ this.standbyTasksPendingRevocation = assignment.standbyTasks();
+ this.warmupTasksPendingRevocation = assignment.warmupTasks();
+ return this;
+ }
+
+ public Builder setActiveTasksPendingRevocation(
+ Map<String, Set<Integer>> activeTasksPendingRevocation) {
+ this.activeTasksPendingRevocation = activeTasksPendingRevocation;
+ return this;
+ }
+
+ public Builder setStandbyTasksPendingRevocation(
+ Map<String, Set<Integer>> standbyTasksPendingRevocation) {
+ this.standbyTasksPendingRevocation = standbyTasksPendingRevocation;
+ return this;
+ }
+
+ public Builder setWarmupTasksPendingRevocation(
+ Map<String, Set<Integer>> warmupTasksPendingRevocation) {
+ this.warmupTasksPendingRevocation = warmupTasksPendingRevocation;
+ return this;
+ }
+
+ public Builder updateWith(StreamsGroupMemberMetadataValue record) {
+ setInstanceId(record.instanceId());
+ setRackId(record.rackId());
+ setClientId(record.clientId());
+ setClientHost(record.clientHost());
+ setRebalanceTimeoutMs(record.rebalanceTimeoutMs());
+ setTopologyEpoch(record.topologyEpoch());
+ setProcessId(record.processId());
+ setUserEndpoint(record.userEndpoint());
+
setClientTags(record.clientTags().stream().collect(Collectors.toMap(
+ StreamsGroupMemberMetadataValue.KeyValue::key,
+ StreamsGroupMemberMetadataValue.KeyValue::value
+ )));
+ return this;
+ }
+
+ public Builder updateWith(StreamsGroupCurrentMemberAssignmentValue
record) {
+ setMemberEpoch(record.memberEpoch());
+ setPreviousMemberEpoch(record.previousMemberEpoch());
+ setState(MemberState.fromValue(record.state()));
+
setAssignedActiveTasks(assignmentFromTaskIds(record.activeTasks()));
+
setAssignedStandbyTasks(assignmentFromTaskIds(record.standbyTasks()));
+
setAssignedWarmupTasks(assignmentFromTaskIds(record.warmupTasks()));
+ setActiveTasksPendingRevocation(
+ assignmentFromTaskIds(record.activeTasksPendingRevocation()));
+ setStandbyTasksPendingRevocation(
+ assignmentFromTaskIds(record.standbyTasksPendingRevocation()));
+ setWarmupTasksPendingRevocation(
+ assignmentFromTaskIds(record.warmupTasksPendingRevocation()));
+ return this;
+ }
+
+ private static Map<String, Set<Integer>> assignmentFromTaskIds(
+ List<StreamsGroupCurrentMemberAssignmentValue.TaskIds>
topicPartitionsList
+ ) {
+ return topicPartitionsList.stream().collect(Collectors.toMap(
+
StreamsGroupCurrentMemberAssignmentValue.TaskIds::subtopologyId,
+ taskIds -> Set.copyOf(taskIds.partitions())));
+ }
+
+ public StreamsGroupMember build() {
+ return new StreamsGroupMember(
+ memberId,
+ memberEpoch,
+ previousMemberEpoch,
+ state,
+ instanceId,
+ rackId,
+ clientId,
+ clientHost,
+ rebalanceTimeoutMs,
+ topologyEpoch,
+ processId,
+ userEndpoint,
+ clientTags,
+ assignedActiveTasks,
+ assignedStandbyTasks,
+ assignedWarmupTasks,
+ activeTasksPendingRevocation,
+ standbyTasksPendingRevocation,
+ warmupTasksPendingRevocation
+ );
+ }
+ }
+
+ /**
+ * @return True if the member is in the Stable state and at the desired
epoch.
+ */
+ public boolean isReconciledTo(int targetAssignmentEpoch) {
+ return state == MemberState.STABLE && memberEpoch ==
targetAssignmentEpoch;
+ }
+
+ /**
+ * Creates a member description for the Streams group describe response
from this member.
+ *
+ * @param targetAssignment The target assignment of this member in the
corresponding group.
+ *
+ * @return The StreamsGroupMember mapped as
StreamsGroupDescribeResponseData.Member.
+ */
+ public StreamsGroupDescribeResponseData.Member
asStreamsGroupDescribeMember(
+ Assignment targetAssignment
+ ) {
+ final StreamsGroupDescribeResponseData.Assignment
describedTargetAssignment =
+ new StreamsGroupDescribeResponseData.Assignment();
+
+ if (targetAssignment != null) {
+ describedTargetAssignment
+ .setActiveTasks(taskIdsFromMap(targetAssignment.activeTasks()))
+
.setStandbyTasks(taskIdsFromMap(targetAssignment.standbyTasks()))
+
.setWarmupTasks(taskIdsFromMap(targetAssignment.warmupTasks()));
+ }
+
+ return new StreamsGroupDescribeResponseData.Member()
+ .setMemberEpoch(memberEpoch)
+ .setMemberId(memberId)
+ .setAssignment(
+ new StreamsGroupDescribeResponseData.Assignment()
+ .setActiveTasks(taskIdsFromMap(assignedActiveTasks))
+ .setStandbyTasks(taskIdsFromMap(assignedStandbyTasks))
+ .setWarmupTasks(taskIdsFromMap(assignedWarmupTasks)))
+ .setTargetAssignment(describedTargetAssignment)
+ .setClientHost(clientHost)
+ .setClientId(clientId)
+ .setInstanceId(instanceId.orElse(null))
+ .setRackId(rackId.orElse(null))
+ .setClientTags(clientTags.entrySet().stream().map(
+ entry -> new StreamsGroupDescribeResponseData.KeyValue()
+ .setKey(entry.getKey())
+ .setValue(entry.getValue())
+ ).collect(Collectors.toList()))
+ .setProcessId(processId)
+ .setTopologyEpoch(topologyEpoch)
+ .setUserEndpoint(
+ userEndpoint.map(
+ endpoint -> new StreamsGroupDescribeResponseData.Endpoint()
+ .setHost(endpoint.host())
+ .setPort(endpoint.port())
+ ).orElse(null)
+ );
+ }
+
+ private static List<StreamsGroupDescribeResponseData.TaskIds>
taskIdsFromMap(
+ Map<String, Set<Integer>> tasks
+ ) {
+ List<StreamsGroupDescribeResponseData.TaskIds> taskIds = new
ArrayList<>();
+ tasks.forEach((subtopologyId, partitionSet) -> {
+ taskIds.add(new StreamsGroupDescribeResponseData.TaskIds()
+ .setSubtopologyId(subtopologyId)
+ .setPartitions(new ArrayList<>(partitionSet)));
+ });
+ return taskIds;
+ }
+
+ /**
+ * @return True if the two provided members have different assigned active
tasks.
+ */
+ public static boolean hasAssignedActiveTasksChanged(
+ StreamsGroupMember member1,
+ StreamsGroupMember member2
+ ) {
+ return
!member1.assignedActiveTasks().equals(member2.assignedActiveTasks());
+ }
+
+ /**
+ * @return True if the two provided members have different assigned active
tasks.
+ */
+ public static boolean hasAssignedStandbyTasksChanged(
+ StreamsGroupMember member1,
+ StreamsGroupMember member2
+ ) {
+ return
!member1.assignedStandbyTasks().equals(member2.assignedStandbyTasks());
+ }
+
+ /**
+ * @return True if the two provided members have different assigned active
tasks.
+ */
+ public static boolean hasAssignedWarmupTasksChanged(
+ StreamsGroupMember member1,
+ StreamsGroupMember member2
+ ) {
+ return
!member1.assignedWarmupTasks().equals(member2.assignedWarmupTasks());
+ }
+}
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/AssignmentTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/AssignmentTest.java
new file mode 100644
index 00000000000..7c0baf27364
--- /dev/null
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/AssignmentTest.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.streams;
+
+import
org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMemberValue;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static
org.apache.kafka.coordinator.group.streams.TaskAssignmentTestUtil.mkTasks;
+import static
org.apache.kafka.coordinator.group.streams.TaskAssignmentTestUtil.mkTasksPerSubtopology;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+public class AssignmentTest {
+
+ static final String SUBTOPOLOGY_1 = "subtopology1";
+ static final String SUBTOPOLOGY_2 = "subtopology2";
+ static final String SUBTOPOLOGY_3 = "subtopology3";
+
+ @Test
+ public void testTasksCannotBeNull() {
+ assertThrows(NullPointerException.class, () -> new Assignment(null,
Collections.emptyMap(), Collections.emptyMap()));
+ assertThrows(NullPointerException.class, () -> new
Assignment(Collections.emptyMap(), null, Collections.emptyMap()));
+ assertThrows(NullPointerException.class, () -> new
Assignment(Collections.emptyMap(), Collections.emptyMap(), null));
+ }
+
+ @Test
+ public void testReturnUnmodifiableTaskAssignments() {
+ Map<String, Set<Integer>> activeTasks = mkTasksPerSubtopology(
+ mkTasks(SUBTOPOLOGY_1, 1, 2, 3)
+ );
+ Map<String, Set<Integer>> standbyTasks = mkTasksPerSubtopology(
+ mkTasks(SUBTOPOLOGY_2, 9, 8, 7)
+ );
+ Map<String, Set<Integer>> warmupTasks = mkTasksPerSubtopology(
+ mkTasks(SUBTOPOLOGY_3, 4, 5, 6)
+ );
+ Assignment assignment = new Assignment(activeTasks, standbyTasks,
warmupTasks);
+
+ assertEquals(activeTasks, assignment.activeTasks());
+ assertThrows(UnsupportedOperationException.class, () ->
assignment.activeTasks().put("not allowed", Collections.emptySet()));
+ assertEquals(standbyTasks, assignment.standbyTasks());
+ assertThrows(UnsupportedOperationException.class, () ->
assignment.standbyTasks().put("not allowed", Collections.emptySet()));
+ assertEquals(warmupTasks, assignment.warmupTasks());
+ assertThrows(UnsupportedOperationException.class, () ->
assignment.warmupTasks().put("not allowed", Collections.emptySet()));
+ }
+
+ @Test
+ public void testFromTargetAssignmentRecord() {
+ List<StreamsGroupTargetAssignmentMemberValue.TaskIds> activeTasks =
new ArrayList<>();
+ activeTasks.add(new StreamsGroupTargetAssignmentMemberValue.TaskIds()
+ .setSubtopologyId(SUBTOPOLOGY_1)
+ .setPartitions(Arrays.asList(1, 2, 3)));
+ activeTasks.add(new StreamsGroupTargetAssignmentMemberValue.TaskIds()
+ .setSubtopologyId(SUBTOPOLOGY_2)
+ .setPartitions(Arrays.asList(4, 5, 6)));
+ List<StreamsGroupTargetAssignmentMemberValue.TaskIds> standbyTasks =
new ArrayList<>();
+ standbyTasks.add(new StreamsGroupTargetAssignmentMemberValue.TaskIds()
+ .setSubtopologyId(SUBTOPOLOGY_1)
+ .setPartitions(Arrays.asList(7, 8, 9)));
+ standbyTasks.add(new StreamsGroupTargetAssignmentMemberValue.TaskIds()
+ .setSubtopologyId(SUBTOPOLOGY_2)
+ .setPartitions(Arrays.asList(1, 2, 3)));
+ List<StreamsGroupTargetAssignmentMemberValue.TaskIds> warmupTasks =
new ArrayList<>();
+ warmupTasks.add(new StreamsGroupTargetAssignmentMemberValue.TaskIds()
+ .setSubtopologyId(SUBTOPOLOGY_1)
+ .setPartitions(Arrays.asList(4, 5, 6)));
+ warmupTasks.add(new StreamsGroupTargetAssignmentMemberValue.TaskIds()
+ .setSubtopologyId(SUBTOPOLOGY_2)
+ .setPartitions(Arrays.asList(7, 8, 9)));
+
+ StreamsGroupTargetAssignmentMemberValue record = new
StreamsGroupTargetAssignmentMemberValue()
+ .setActiveTasks(activeTasks)
+ .setStandbyTasks(standbyTasks)
+ .setWarmupTasks(warmupTasks);
+
+ Assignment assignment = Assignment.fromRecord(record);
+
+ assertEquals(
+ mkTasksPerSubtopology(
+ mkTasks(SUBTOPOLOGY_1, 1, 2, 3),
+ mkTasks(SUBTOPOLOGY_2, 4, 5, 6)
+ ),
+ assignment.activeTasks()
+ );
+ assertEquals(
+ mkTasksPerSubtopology(
+ mkTasks(SUBTOPOLOGY_1, 7, 8, 9),
+ mkTasks(SUBTOPOLOGY_2, 1, 2, 3)
+ ),
+ assignment.standbyTasks()
+ );
+ assertEquals(
+ mkTasksPerSubtopology(
+ mkTasks(SUBTOPOLOGY_1, 4, 5, 6),
+ mkTasks(SUBTOPOLOGY_2, 7, 8, 9)
+ ),
+ assignment.warmupTasks()
+ );
+ }
+}
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupMemberTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupMemberTest.java
new file mode 100644
index 00000000000..8c6d3d9088a
--- /dev/null
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupMemberTest.java
@@ -0,0 +1,429 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.streams;
+
+import org.apache.kafka.common.message.StreamsGroupDescribeResponseData;
+import
org.apache.kafka.coordinator.group.generated.StreamsGroupCurrentMemberAssignmentValue;
+import
org.apache.kafka.coordinator.group.generated.StreamsGroupCurrentMemberAssignmentValue.TaskIds;
+import
org.apache.kafka.coordinator.group.generated.StreamsGroupMemberMetadataValue;
+import
org.apache.kafka.coordinator.group.generated.StreamsGroupMemberMetadataValue.KeyValue;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static
org.apache.kafka.coordinator.group.streams.TaskAssignmentTestUtil.mkTasks;
+import static
org.apache.kafka.coordinator.group.streams.TaskAssignmentTestUtil.mkTasksPerSubtopology;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+public class StreamsGroupMemberTest {
+
+ private static final String MEMBER_ID = "member-id";
+ private static final int MEMBER_EPOCH = 10;
+ private static final int PREVIOUS_MEMBER_EPOCH = 9;
+ private static final MemberState STATE = MemberState.UNRELEASED_TASKS;
+ private static final String INSTANCE_ID = "instance-id";
+ private static final String RACK_ID = "rack-id";
+ private static final int REBALANCE_TIMEOUT = 5000;
+ private static final String CLIENT_ID = "client-id";
+ private static final String HOSTNAME = "hostname";
+ private static final int TOPOLOGY_EPOCH = 3;
+ private static final String PROCESS_ID = "process-id";
+ private static final String SUBTOPOLOGY1 = "subtopology1";
+ private static final String SUBTOPOLOGY2 = "subtopology2";
+ private static final String SUBTOPOLOGY3 = "subtopology3";
+ private static final StreamsGroupMemberMetadataValue.Endpoint
USER_ENDPOINT =
+ new
StreamsGroupMemberMetadataValue.Endpoint().setHost("host").setPort(9090);
+ private static final String CLIENT_TAG_KEY = "client";
+ private static final String CLIENT_TAG_VALUE = "tag";
+ private static final Map<String, String> CLIENT_TAGS =
mkMap(mkEntry(CLIENT_TAG_KEY, CLIENT_TAG_VALUE));
+ private static final List<Integer> TASKS1 = List.of(1, 2, 3);
+ private static final List<Integer> TASKS2 = List.of(4, 5, 6);
+ private static final List<Integer> TASKS3 = List.of(7, 8);
+ private static final List<Integer> TASKS4 = List.of(3, 2, 1);
+ private static final List<Integer> TASKS5 = List.of(6, 5, 4);
+ private static final List<Integer> TASKS6 = List.of(9, 7);
+ private static final Map<String, Set<Integer>> ASSIGNED_ACTIVE_TASKS =
mkTasksPerSubtopology(mkTasks(SUBTOPOLOGY1, TASKS1.toArray(Integer[]::new)));
+ private static final Map<String, Set<Integer>> ASSIGNED_STANDBY_TASKS =
mkTasksPerSubtopology(mkTasks(SUBTOPOLOGY2, TASKS2.toArray(Integer[]::new)));
+ private static final Map<String, Set<Integer>> ASSIGNED_WARMUP_TASKS =
mkTasksPerSubtopology(mkTasks(SUBTOPOLOGY1, TASKS3.toArray(Integer[]::new)));
+ private static final Map<String, Set<Integer>>
ACTIVE_TASKS_PENDING_REVOCATION = mkTasksPerSubtopology(mkTasks(SUBTOPOLOGY2,
TASKS4.toArray(Integer[]::new)));
+ private static final Map<String, Set<Integer>>
STANDBY_TASKS_PENDING_REVOCATION = mkTasksPerSubtopology(mkTasks(SUBTOPOLOGY1,
TASKS5.toArray(Integer[]::new)));
+ private static final Map<String, Set<Integer>>
WARMUP_TASKS_PENDING_REVOCATION = mkTasksPerSubtopology(mkTasks(SUBTOPOLOGY2,
TASKS6.toArray(Integer[]::new)));
+
+ @Test
+ public void testBuilderWithMemberIdIsNull() {
+ final Exception exception = assertThrows(
+ NullPointerException.class,
+ () -> new StreamsGroupMember.Builder((String) null).build()
+ );
+ assertEquals("memberId cannot be null", exception.getMessage());
+ }
+
+ @Test
+ public void testBuilderWithMemberIsNull() {
+ final Exception exception = assertThrows(
+ NullPointerException.class,
+ () -> new StreamsGroupMember.Builder((StreamsGroupMember)
null).build()
+ );
+ assertEquals("member cannot be null", exception.getMessage());
+ }
+
+ @Test
+ public void testBuilderWithDefaults() {
+ StreamsGroupMember member = new
StreamsGroupMember.Builder(MEMBER_ID).build();
+
+ assertEquals(MEMBER_ID, member.memberId());
+ assertNull(member.memberEpoch());
+ assertNull(member.previousMemberEpoch());
+ assertNull(member.state());
+ assertNull(member.instanceId());
+ assertNull(member.rackId());
+ assertNull(member.rebalanceTimeoutMs());
+ assertNull(member.clientId());
+ assertNull(member.clientHost());
+ assertNull(member.topologyEpoch());
+ assertNull(member.processId());
+ assertNull(member.userEndpoint());
+ assertNull(member.clientTags());
+ assertNull(member.assignedActiveTasks());
+ assertNull(member.assignedStandbyTasks());
+ assertNull(member.assignedWarmupTasks());
+ assertNull(member.activeTasksPendingRevocation());
+ assertNull(member.standbyTasksPendingRevocation());
+ assertNull(member.warmupTasksPendingRevocation());
+ }
+
+ @Test
+ public void testBuilderNewMember() {
+ StreamsGroupMember member = createStreamsGroupMember();
+
+ assertEquals(MEMBER_ID, member.memberId());
+ assertEquals(MEMBER_EPOCH, member.memberEpoch());
+ assertEquals(PREVIOUS_MEMBER_EPOCH, member.previousMemberEpoch());
+ assertEquals(STATE, member.state());
+ assertEquals(Optional.of(INSTANCE_ID), member.instanceId());
+ assertEquals(Optional.of(RACK_ID), member.rackId());
+ assertEquals(CLIENT_ID, member.clientId());
+ assertEquals(HOSTNAME, member.clientHost());
+ assertEquals(TOPOLOGY_EPOCH, member.topologyEpoch());
+ assertEquals(PROCESS_ID, member.processId());
+ assertEquals(Optional.of(USER_ENDPOINT), member.userEndpoint());
+ assertEquals(CLIENT_TAGS, member.clientTags());
+ assertEquals(ASSIGNED_ACTIVE_TASKS, member.assignedActiveTasks());
+ assertEquals(ASSIGNED_STANDBY_TASKS, member.assignedStandbyTasks());
+ assertEquals(ASSIGNED_WARMUP_TASKS, member.assignedWarmupTasks());
+ assertEquals(ACTIVE_TASKS_PENDING_REVOCATION,
member.activeTasksPendingRevocation());
+ assertEquals(STANDBY_TASKS_PENDING_REVOCATION,
member.standbyTasksPendingRevocation());
+ assertEquals(WARMUP_TASKS_PENDING_REVOCATION,
member.warmupTasksPendingRevocation());
+ }
+
+ @Test
+ public void testBuilderUpdateWithStreamsGroupMemberMetadataValue() {
+ StreamsGroupMemberMetadataValue record = new
StreamsGroupMemberMetadataValue()
+ .setClientId(CLIENT_ID)
+ .setClientHost(HOSTNAME)
+ .setInstanceId(INSTANCE_ID)
+ .setRackId(RACK_ID)
+ .setRebalanceTimeoutMs(REBALANCE_TIMEOUT)
+ .setTopologyEpoch(TOPOLOGY_EPOCH)
+ .setProcessId(PROCESS_ID)
+ .setUserEndpoint(USER_ENDPOINT)
+ .setClientTags(CLIENT_TAGS.entrySet().stream()
+ .map(e -> new
KeyValue().setKey(e.getKey()).setValue(e.getValue()))
+ .collect(Collectors.toList()));
+
+ StreamsGroupMember member = new StreamsGroupMember.Builder("member-id")
+ .updateWith(record)
+ .build();
+
+ assertEquals(record.clientId(), member.clientId());
+ assertEquals(record.clientHost(), member.clientHost());
+ assertEquals(Optional.of(record.instanceId()), member.instanceId());
+ assertEquals(Optional.of(record.rackId()), member.rackId());
+ assertEquals(record.rebalanceTimeoutMs(), member.rebalanceTimeoutMs());
+ assertEquals(record.topologyEpoch(), member.topologyEpoch());
+ assertEquals(record.processId(), member.processId());
+ assertEquals(Optional.of(record.userEndpoint()),
member.userEndpoint());
+ assertEquals(
+
record.clientTags().stream().collect(Collectors.toMap(KeyValue::key,
KeyValue::value)),
+ member.clientTags()
+ );
+ assertEquals(MEMBER_ID, member.memberId());
+ assertNull(member.memberEpoch());
+ assertNull(member.previousMemberEpoch());
+ assertNull(member.state());
+ assertNull(member.assignedActiveTasks());
+ assertNull(member.assignedStandbyTasks());
+ assertNull(member.assignedWarmupTasks());
+ assertNull(member.activeTasksPendingRevocation());
+ assertNull(member.standbyTasksPendingRevocation());
+ assertNull(member.warmupTasksPendingRevocation());
+ }
+
+ @Test
+ public void
testBuilderUpdateWithConsumerGroupCurrentMemberAssignmentValue() {
+ StreamsGroupCurrentMemberAssignmentValue record = new
StreamsGroupCurrentMemberAssignmentValue()
+ .setMemberEpoch(MEMBER_EPOCH)
+ .setPreviousMemberEpoch(PREVIOUS_MEMBER_EPOCH)
+ .setState(STATE.value())
+ .setActiveTasks(List.of(new
TaskIds().setSubtopologyId(SUBTOPOLOGY1).setPartitions(TASKS1)))
+ .setStandbyTasks(List.of(new
TaskIds().setSubtopologyId(SUBTOPOLOGY2).setPartitions(TASKS2)))
+ .setWarmupTasks(List.of(new
TaskIds().setSubtopologyId(SUBTOPOLOGY1).setPartitions(TASKS3)))
+ .setActiveTasksPendingRevocation(List.of(new
TaskIds().setSubtopologyId(SUBTOPOLOGY2).setPartitions(TASKS4)))
+ .setStandbyTasksPendingRevocation(List.of(new
TaskIds().setSubtopologyId(SUBTOPOLOGY1).setPartitions(TASKS5)))
+ .setWarmupTasksPendingRevocation(List.of(new
TaskIds().setSubtopologyId(SUBTOPOLOGY2).setPartitions(TASKS6)));
+
+ StreamsGroupMember member = new StreamsGroupMember.Builder(MEMBER_ID)
+ .updateWith(record)
+ .build();
+
+ assertEquals(MEMBER_ID, member.memberId());
+ assertEquals(record.memberEpoch(), member.memberEpoch());
+ assertEquals(record.previousMemberEpoch(),
member.previousMemberEpoch());
+ assertEquals(MemberState.fromValue(record.state()), member.state());
+ assertEquals(ASSIGNED_ACTIVE_TASKS, member.assignedActiveTasks());
+ assertEquals(ASSIGNED_STANDBY_TASKS, member.assignedStandbyTasks());
+ assertEquals(ASSIGNED_WARMUP_TASKS, member.assignedWarmupTasks());
+ assertEquals(ACTIVE_TASKS_PENDING_REVOCATION,
member.activeTasksPendingRevocation());
+ assertEquals(STANDBY_TASKS_PENDING_REVOCATION,
member.standbyTasksPendingRevocation());
+ assertEquals(WARMUP_TASKS_PENDING_REVOCATION,
member.warmupTasksPendingRevocation());
+ assertNull(member.instanceId());
+ assertNull(member.rackId());
+ assertNull(member.rebalanceTimeoutMs());
+ assertNull(member.clientId());
+ assertNull(member.clientHost());
+ assertNull(member.topologyEpoch());
+ assertNull(member.processId());
+ assertNull(member.userEndpoint());
+ assertNull(member.clientTags());
+ }
+
+ @Test
+ public void testBuilderMaybeUpdateMember() {
+ final StreamsGroupMember member = createStreamsGroupMember();
+
+ // This is a no-op.
+ StreamsGroupMember updatedMember = new
StreamsGroupMember.Builder(member)
+ .maybeUpdateRackId(Optional.empty())
+ .maybeUpdateInstanceId(Optional.empty())
+ .maybeUpdateRebalanceTimeoutMs(OptionalInt.empty())
+ .maybeUpdateProcessId(Optional.empty())
+ .maybeUpdateTopologyEpoch(OptionalInt.empty())
+ .maybeUpdateUserEndpoint(Optional.empty())
+ .maybeUpdateClientTags(Optional.empty())
+ .build();
+
+ assertEquals(member, updatedMember);
+
+ final String newRackId = "new" + member.rackId();
+ final String newInstanceId = "new" + member.instanceId();
+ final Integer newRebalanceTimeout = member.rebalanceTimeoutMs() + 1000;
+ final String newProcessId = "new" + member.processId();
+ final Integer newTopologyEpoch = member.topologyEpoch() + 1;
+ final StreamsGroupMemberMetadataValue.Endpoint newUserEndpoint =
+ new
StreamsGroupMemberMetadataValue.Endpoint().setHost(member.userEndpoint().get().host()
+ "2").setPort(9090);
+ final Map<String, String> newClientTags = new
HashMap<>(member.clientTags());
+ newClientTags.put("client2", "tag2");
+
+ updatedMember = new StreamsGroupMember.Builder(member)
+ .maybeUpdateRackId(Optional.of(newRackId))
+ .maybeUpdateInstanceId(Optional.of(newInstanceId))
+ .maybeUpdateRebalanceTimeoutMs(OptionalInt.of(6000))
+ .maybeUpdateProcessId(Optional.of(newProcessId))
+ .maybeUpdateTopologyEpoch(OptionalInt.of(newTopologyEpoch))
+ .maybeUpdateUserEndpoint(Optional.of(newUserEndpoint))
+ .maybeUpdateClientTags(Optional.of(newClientTags))
+ .build();
+
+ assertEquals(Optional.of(newRackId), updatedMember.rackId());
+ assertEquals(Optional.of(newInstanceId), updatedMember.instanceId());
+ assertEquals(newRebalanceTimeout, updatedMember.rebalanceTimeoutMs());
+ assertEquals(newProcessId, updatedMember.processId());
+ assertEquals(newTopologyEpoch, updatedMember.topologyEpoch());
+ assertEquals(Optional.of(newUserEndpoint),
updatedMember.userEndpoint());
+ assertEquals(newClientTags, updatedMember.clientTags());
+ assertEquals(member.memberId(), updatedMember.memberId());
+ assertEquals(member.memberEpoch(), updatedMember.memberEpoch());
+ assertEquals(member.previousMemberEpoch(),
updatedMember.previousMemberEpoch());
+ assertEquals(member.state(), updatedMember.state());
+ assertEquals(member.clientId(), updatedMember.clientId());
+ assertEquals(member.clientHost(), updatedMember.clientHost());
+ assertEquals(member.assignedActiveTasks(),
updatedMember.assignedActiveTasks());
+ assertEquals(member.assignedStandbyTasks(),
updatedMember.assignedStandbyTasks());
+ assertEquals(member.assignedWarmupTasks(),
updatedMember.assignedWarmupTasks());
+ assertEquals(member.activeTasksPendingRevocation(),
updatedMember.activeTasksPendingRevocation());
+ assertEquals(member.standbyTasksPendingRevocation(),
updatedMember.standbyTasksPendingRevocation());
+ assertEquals(member.warmupTasksPendingRevocation(),
updatedMember.warmupTasksPendingRevocation());
+ }
+
+ @Test
+ public void testBuilderUpdateMemberEpoch() {
+ final StreamsGroupMember member = createStreamsGroupMember();
+
+ final int newMemberEpoch = member.memberEpoch() + 1;
+ final StreamsGroupMember updatedMember = new
StreamsGroupMember.Builder(member)
+ .updateMemberEpoch(newMemberEpoch)
+ .build();
+
+ assertEquals(member.memberId(), updatedMember.memberId());
+ assertEquals(newMemberEpoch, updatedMember.memberEpoch());
+ // The previous member epoch becomes the old current member epoch.
+ assertEquals(member.memberEpoch(),
updatedMember.previousMemberEpoch());
+ assertEquals(member.state(), updatedMember.state());
+ assertEquals(member.instanceId(), updatedMember.instanceId());
+ assertEquals(member.rackId(), updatedMember.rackId());
+ assertEquals(member.rebalanceTimeoutMs(),
updatedMember.rebalanceTimeoutMs());
+ assertEquals(member.clientId(), updatedMember.clientId());
+ assertEquals(member.clientHost(), updatedMember.clientHost());
+ assertEquals(member.topologyEpoch(), updatedMember.topologyEpoch());
+ assertEquals(member.processId(), updatedMember.processId());
+ assertEquals(member.userEndpoint(), updatedMember.userEndpoint());
+ assertEquals(member.clientTags(), updatedMember.clientTags());
+ assertEquals(member.assignedActiveTasks(),
updatedMember.assignedActiveTasks());
+ assertEquals(member.assignedStandbyTasks(),
updatedMember.assignedStandbyTasks());
+ assertEquals(member.assignedWarmupTasks(),
updatedMember.assignedWarmupTasks());
+ assertEquals(member.activeTasksPendingRevocation(),
updatedMember.activeTasksPendingRevocation());
+ assertEquals(member.standbyTasksPendingRevocation(),
updatedMember.standbyTasksPendingRevocation());
+ assertEquals(member.warmupTasksPendingRevocation(),
updatedMember.warmupTasksPendingRevocation());
+ }
+
+ @Test
+ public void testReturnUnmodifiableFields() {
+ final StreamsGroupMember member = createStreamsGroupMember();
+
+ assertThrows(UnsupportedOperationException.class, () ->
member.clientTags().put("not allowed", ""));
+ assertThrows(UnsupportedOperationException.class, () ->
member.assignedActiveTasks().put("not allowed", Collections.emptySet()));
+ assertThrows(UnsupportedOperationException.class, () ->
member.assignedStandbyTasks().put("not allowed", Collections.emptySet()));
+ assertThrows(UnsupportedOperationException.class, () ->
member.assignedWarmupTasks().put("not allowed", Collections.emptySet()));
+ assertThrows(UnsupportedOperationException.class, () ->
member.activeTasksPendingRevocation().put("not allowed",
Collections.emptySet()));
+ assertThrows(UnsupportedOperationException.class, () ->
member.standbyTasksPendingRevocation().put("not allowed",
Collections.emptySet()));
+ assertThrows(UnsupportedOperationException.class, () ->
member.warmupTasksPendingRevocation().put("not allowed",
Collections.emptySet()));
+ }
+
+ @Test
+ public void testAsStreamsGroupDescribeMember() {
+ final StreamsGroupMember member = createStreamsGroupMember();
+ List<Integer> assignedTasks1 = Arrays.asList(10, 11, 12);
+ List<Integer> assignedTasks2 = Arrays.asList(13, 14, 15);
+ List<Integer> assignedTasks3 = Arrays.asList(16, 17, 18);
+ Assignment targetAssignment = new Assignment(
+ mkMap(mkEntry(SUBTOPOLOGY1, new HashSet<>(assignedTasks3))),
+ mkMap(mkEntry(SUBTOPOLOGY2, new HashSet<>(assignedTasks2))),
+ mkMap(mkEntry(SUBTOPOLOGY3, new HashSet<>(assignedTasks1)))
+ );
+
+ StreamsGroupDescribeResponseData.Member actual =
member.asStreamsGroupDescribeMember(targetAssignment);
+ StreamsGroupDescribeResponseData.Member expected = new
StreamsGroupDescribeResponseData.Member()
+ .setMemberId(MEMBER_ID)
+ .setMemberEpoch(MEMBER_EPOCH)
+ .setClientId(CLIENT_ID)
+ .setInstanceId(INSTANCE_ID)
+ .setRackId(RACK_ID)
+ .setClientHost(HOSTNAME)
+ .setProcessId(PROCESS_ID)
+ .setTopologyEpoch(TOPOLOGY_EPOCH)
+ .setClientTags(List.of(
+ new
StreamsGroupDescribeResponseData.KeyValue().setKey(CLIENT_TAG_KEY).setValue(CLIENT_TAG_VALUE))
+ )
+ .setAssignment(
+ new StreamsGroupDescribeResponseData.Assignment()
+ .setActiveTasks(List.of(
+ new StreamsGroupDescribeResponseData.TaskIds()
+ .setSubtopologyId(SUBTOPOLOGY1)
+ .setPartitions(TASKS1))
+ )
+ .setStandbyTasks(List.of(
+ new StreamsGroupDescribeResponseData.TaskIds()
+ .setSubtopologyId(SUBTOPOLOGY2)
+ .setPartitions(TASKS2))
+ )
+ .setWarmupTasks(List.of(
+ new StreamsGroupDescribeResponseData.TaskIds()
+ .setSubtopologyId(SUBTOPOLOGY1)
+ .setPartitions(TASKS3))
+ )
+ )
+ .setTargetAssignment(
+ new StreamsGroupDescribeResponseData.Assignment()
+ .setActiveTasks(List.of(
+ new StreamsGroupDescribeResponseData.TaskIds()
+ .setSubtopologyId(SUBTOPOLOGY1)
+ .setPartitions(assignedTasks3))
+ )
+ .setStandbyTasks(List.of(
+ new StreamsGroupDescribeResponseData.TaskIds()
+ .setSubtopologyId(SUBTOPOLOGY2)
+ .setPartitions(assignedTasks2))
+ )
+ .setWarmupTasks(List.of(
+ new StreamsGroupDescribeResponseData.TaskIds()
+ .setSubtopologyId(SUBTOPOLOGY3)
+ .setPartitions(assignedTasks1))
+ )
+ )
+ .setUserEndpoint(new StreamsGroupDescribeResponseData.Endpoint()
+ .setHost(USER_ENDPOINT.host())
+ .setPort(USER_ENDPOINT.port())
+ );
+
+ assertEquals(expected, actual);
+ }
+
+ @Test
+ public void testAsStreamsGroupDescribeWithTargetAssignmentNull() {
+ final StreamsGroupMember member = createStreamsGroupMember();
+ StreamsGroupDescribeResponseData.Member streamsGroupDescribeMember =
member.asStreamsGroupDescribeMember(null);
+
+ assertEquals(new StreamsGroupDescribeResponseData.Assignment(),
streamsGroupDescribeMember.targetAssignment());
+ }
+
+ private StreamsGroupMember createStreamsGroupMember() {
+ return new StreamsGroupMember.Builder(MEMBER_ID)
+ .setMemberEpoch(MEMBER_EPOCH)
+ .setPreviousMemberEpoch(PREVIOUS_MEMBER_EPOCH)
+ .setState(STATE)
+ .setInstanceId(INSTANCE_ID)
+ .setRackId(RACK_ID)
+ .setRebalanceTimeoutMs(REBALANCE_TIMEOUT)
+ .setClientId(CLIENT_ID)
+ .setClientHost(HOSTNAME)
+ .setTopologyEpoch(TOPOLOGY_EPOCH)
+ .setProcessId(PROCESS_ID)
+ .setUserEndpoint(USER_ENDPOINT)
+ .setClientTags(CLIENT_TAGS)
+ .setAssignedActiveTasks(ASSIGNED_ACTIVE_TASKS)
+ .setAssignedStandbyTasks(ASSIGNED_STANDBY_TASKS)
+ .setAssignedWarmupTasks(ASSIGNED_WARMUP_TASKS)
+ .setActiveTasksPendingRevocation(ACTIVE_TASKS_PENDING_REVOCATION)
+ .setStandbyTasksPendingRevocation(STANDBY_TASKS_PENDING_REVOCATION)
+ .setWarmupTasksPendingRevocation(WARMUP_TASKS_PENDING_REVOCATION)
+ .build();
+ }
+}
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/TaskAssignmentTestUtil.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/TaskAssignmentTestUtil.java
new file mode 100644
index 00000000000..47668ec84c0
--- /dev/null
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/TaskAssignmentTestUtil.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.streams;
+
+import java.util.AbstractMap;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+
+public class TaskAssignmentTestUtil {
+
+ public static Assignment mkAssignment(final Map<String, Set<Integer>>
activeTasks,
+ final Map<String, Set<Integer>>
standbyTasks,
+ final Map<String, Set<Integer>>
warmupTasks) {
+ return new Assignment(
+ Collections.unmodifiableMap(Objects.requireNonNull(activeTasks)),
+ Collections.unmodifiableMap(Objects.requireNonNull(standbyTasks)),
+ Collections.unmodifiableMap(Objects.requireNonNull(warmupTasks))
+ );
+ }
+
+ public static Map.Entry<String, Set<Integer>> mkTasks(String subtopologyId,
+ Integer... tasks) {
+ return new AbstractMap.SimpleEntry<>(
+ subtopologyId,
+ new HashSet<>(List.of(tasks))
+ );
+ }
+
+ @SafeVarargs
+ public static Map<String, Set<Integer>>
mkTasksPerSubtopology(Map.Entry<String,
+
Set<Integer>>... entries) {
+ Map<String, Set<Integer>> assignment = new HashMap<>();
+ for (Map.Entry<String, Set<Integer>> entry : entries) {
+ assignment.put(entry.getKey(), entry.getValue());
+ }
+ return assignment;
+ }
+}