This is an automated email from the ASF dual-hosted git repository.

cadonna pushed a commit to branch kip1071
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 60f6773fa631a4c6e2700e858c06584f31b32511
Author: Bruno Cadonna <[email protected]>
AuthorDate: Fri Jun 7 14:31:36 2024 +0200

    Add Assignment class in group coordinator for Streams
    
    See https://github.com/lucasbru/kafka/pull/17
---
 .../coordinator/group/streams/Assignment.java      | 124 +++++++++++++++
 .../coordinator/group/streams/MemberState.java     |  77 +++++++++
 .../kafka/coordinator/group/streams/TopicIds.java  | 176 +++++++++++++++++++++
 .../coordinator/group/streams/TopicMetadata.java   | 154 ++++++++++++++++++
 .../coordinator/group/AssignmentTestUtil.java      |  19 +++
 .../coordinator/group/streams/AssignmentTest.java  | 134 ++++++++++++++++
 6 files changed, 684 insertions(+)

diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/Assignment.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/Assignment.java
new file mode 100644
index 00000000000..735ee414785
--- /dev/null
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/Assignment.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.streams;
+
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMemberValue;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * An immutable assignment for a member.
+ */
+public class Assignment {
+    public static final Assignment EMPTY = new Assignment(
+        Collections.emptyMap(),
+        Collections.emptyMap(),
+        Collections.emptyMap()
+    );
+
+    private final Map<String, Set<Integer>> activeTasks;
+    private final Map<String, Set<Integer>> standbyTasks;
+    private final Map<String, Set<Integer>> warmupTasks;
+
+    public Assignment(final Map<String, Set<Integer>> activeTasks,
+                      final Map<String, Set<Integer>> standbyTasks,
+                      final Map<String, Set<Integer>> warmupTasks) {
+        this.activeTasks = 
Collections.unmodifiableMap(Objects.requireNonNull(activeTasks));
+        this.standbyTasks = 
Collections.unmodifiableMap(Objects.requireNonNull(standbyTasks));
+        this.warmupTasks = 
Collections.unmodifiableMap(Objects.requireNonNull(warmupTasks));
+    }
+
+    /**
+     * @return The assigned active tasks.
+     */
+    public Map<String, Set<Integer>> activeTasks() {
+        return activeTasks;
+    }
+
+    /**
+     * @return The assigned standby tasks.
+     */
+    public Map<String, Set<Integer>> standbyTasks() {
+        return standbyTasks;
+    }
+
+    /**
+     * @return The assigned warm-up tasks.
+     */
+    public Map<String, Set<Integer>> warmupTasks() {
+        return warmupTasks;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        Assignment that = (Assignment) o;
+        return Objects.equals(activeTasks, that.activeTasks)
+            && Objects.equals(standbyTasks, that.standbyTasks)
+            && Objects.equals(warmupTasks, that.warmupTasks);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(activeTasks, standbyTasks, warmupTasks);
+    }
+
+    @Override
+    public String toString() {
+        return "Assignment(active tasks=" + activeTasks +
+            ", standby tasks=" + standbyTasks +
+            ", warm-up tasks=" + warmupTasks +')';
+    }
+
+    /**
+     * Creates a {{@link 
org.apache.kafka.coordinator.group.streams.Assignment}} from a
+     * {{@link 
org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMemberValue}}.
+     *
+     * @param record The record.
+     * @return A {{@link 
org.apache.kafka.coordinator.group.streams.Assignment}}.
+     */
+    public static Assignment fromRecord(
+        StreamsGroupTargetAssignmentMemberValue record
+    ) {
+        return new Assignment(
+            record.activeTasks().stream()
+                .collect(Collectors.toMap(
+                    
StreamsGroupTargetAssignmentMemberValue.TaskId::subtopology,
+                    taskId -> new HashSet<>(taskId.partitions())
+                    )
+                ),
+            record.standbyTasks().stream()
+                .collect(Collectors.toMap(
+                        
StreamsGroupTargetAssignmentMemberValue.TaskId::subtopology,
+                        taskId -> new HashSet<>(taskId.partitions())
+                    )
+                ),
+            record.warmupTasks().stream()
+                .collect(Collectors.toMap(
+                        
StreamsGroupTargetAssignmentMemberValue.TaskId::subtopology,
+                        taskId -> new HashSet<>(taskId.partitions())
+                    )
+                )
+        );
+    }
+}
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/MemberState.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/MemberState.java
new file mode 100644
index 00000000000..53518e61934
--- /dev/null
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/MemberState.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.streams;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * The various states that a member can be in. For their definition,
+ * refer to the documentation of {{@link 
org.apache.kafka.coordinator.group.streams.CurrentAssignmentBuilder}}.
+ */
+public enum MemberState {
+
+    /**
+     * The member is fully reconciled with the desired target assignment.
+     */
+    STABLE((byte) 1),
+
+    /**
+     * The member must revoke some tasks in order to be able to
+     * transition to the next epoch.
+     */
+    UNREVOKED_TASKS((byte) 2),
+
+    /**
+     * The member transitioned to the last epoch but waits on some
+     * tasks which have not been revoked by their previous
+     * owners yet.
+     */
+    UNRELEASED_TASKS((byte) 3),
+
+    /**
+     * The member is in an unknown state. This can only happen if a future
+     * version of the software introduces a new state unknown by this version.
+     */
+    UNKNOWN((byte) 127);
+
+    private final static Map<Byte, MemberState> VALUES_TO_ENUMS = new 
HashMap<>();
+
+    static {
+        for (MemberState state: MemberState.values()) {
+            VALUES_TO_ENUMS.put(state.value(), state);
+        }
+    }
+
+    private final byte value;
+
+    MemberState(byte value) {
+        this.value = value;
+    }
+
+    public byte value() {
+        return value;
+    }
+
+    public static MemberState fromValue(byte value) {
+        MemberState state = VALUES_TO_ENUMS.get(value);
+        if (state == null) {
+            return UNKNOWN;
+        }
+        return state;
+    }
+}
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/TopicIds.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/TopicIds.java
new file mode 100644
index 00000000000..7beb9b9e4ff
--- /dev/null
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/TopicIds.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.streams;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.image.TopicImage;
+import org.apache.kafka.image.TopicsImage;
+
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.Objects;
+import java.util.Set;
+
+/**
+ * TopicIds is initialized with topic names (String) but exposes a Set of 
topic ids (Uuid) to the
+ * user and performs the conversion lazily with TopicsImage.
+ */
+public class TopicIds implements Set<Uuid> {
+    private final Set<String> topicNames;
+    private final TopicsImage image;
+
+    public TopicIds(
+        Set<String> topicNames,
+        TopicsImage image
+    ) {
+        this.topicNames = Objects.requireNonNull(topicNames);
+        this.image = Objects.requireNonNull(image);
+    }
+
+    @Override
+    public int size() {
+        return topicNames.size();
+    }
+
+    @Override
+    public boolean isEmpty() {
+        return topicNames.isEmpty();
+    }
+
+    @Override
+    public boolean contains(Object o) {
+        if (o instanceof Uuid) {
+            Uuid topicId = (Uuid) o;
+            TopicImage topicImage = image.getTopic(topicId);
+            if (topicImage == null) return false;
+            return topicNames.contains(topicImage.name());
+        }
+        return false;
+    }
+
+    private static class TopicIdIterator implements Iterator<Uuid> {
+        final Iterator<String> iterator;
+        final TopicsImage image;
+        private Uuid next = null;
+
+        private TopicIdIterator(
+            Iterator<String> iterator,
+            TopicsImage image
+        ) {
+            this.iterator = Objects.requireNonNull(iterator);
+            this.image = Objects.requireNonNull(image);
+        }
+
+        @Override
+        public boolean hasNext() {
+            if (next != null) return true;
+            Uuid result = null;
+            do {
+                if (!iterator.hasNext()) {
+                    return false;
+                }
+                String next = iterator.next();
+                TopicImage topicImage = image.getTopic(next);
+                if (topicImage != null) {
+                    result = topicImage.id();
+                }
+            } while (result == null);
+            next = result;
+            return true;
+        }
+
+        @Override
+        public Uuid next() {
+            if (!hasNext()) throw new NoSuchElementException();
+            Uuid result = next;
+            next = null;
+            return result;
+        }
+    }
+
+    @Override
+    public Iterator<Uuid> iterator() {
+        return new TopicIdIterator(topicNames.iterator(), image);
+    }
+
+    @Override
+    public Object[] toArray() {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public <T> T[] toArray(T[] a) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public boolean add(Uuid o) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public boolean remove(Object o) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public boolean addAll(Collection c) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void clear() {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public boolean removeAll(Collection c) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public boolean retainAll(Collection c) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public boolean containsAll(Collection c) {
+        for (Object o : c) {
+            if (!contains(o)) return false;
+        }
+        return true;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+
+        TopicIds uuids = (TopicIds) o;
+
+        if (!Objects.equals(topicNames, uuids.topicNames)) return false;
+        return Objects.equals(image, uuids.image);
+    }
+
+    @Override
+    public int hashCode() {
+        int result = topicNames.hashCode();
+        result = 31 * result + image.hashCode();
+        return result;
+    }
+}
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/TopicMetadata.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/TopicMetadata.java
new file mode 100644
index 00000000000..b03afea1be1
--- /dev/null
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/TopicMetadata.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.streams;
+
+import org.apache.kafka.common.Uuid;
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupPartitionMetadataValue;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+
+/**
+ * Immutable topic metadata.
+ */
+public class TopicMetadata {
+    /**
+     * The topic id.
+     */
+    private final Uuid id;
+
+    /**
+     * The topic name.
+     */
+    private final String name;
+
+    /**
+     * The number of partitions.
+     */
+    private final int numPartitions;
+
+    /**
+     * Map of every partition Id to a set of its rack Ids, if they exist.
+     * If rack information is unavailable for all partitions, this is an empty 
map.
+     */
+    private final Map<Integer, Set<String>> partitionRacks;
+
+    public TopicMetadata(
+        Uuid id,
+        String name,
+        int numPartitions,
+        Map<Integer, Set<String>> partitionRacks
+    ) {
+        this.id = Objects.requireNonNull(id);
+        if (Uuid.ZERO_UUID.equals(id)) {
+            throw new IllegalArgumentException("Topic id cannot be 
ZERO_UUID.");
+        }
+        this.name = Objects.requireNonNull(name);
+        if (name.isEmpty()) {
+            throw new IllegalArgumentException("Topic name cannot be empty.");
+        }
+        this.numPartitions = numPartitions;
+        if (numPartitions < 0) {
+            throw new IllegalArgumentException("Number of partitions cannot be 
negative.");
+        }
+        this.partitionRacks = Objects.requireNonNull(partitionRacks);
+    }
+
+    /**
+     * @return The topic id.
+     */
+    public Uuid id() {
+        return this.id;
+    }
+
+    /**
+     * @return The topic name.
+     */
+    public String name() {
+        return this.name;
+    }
+
+    /**
+     * @return The number of partitions.
+     */
+    public int numPartitions() {
+        return this.numPartitions;
+    }
+
+    /**
+     * @return Every partition mapped to the set of corresponding available 
rack Ids of its replicas.
+     *         An empty map is returned if rack information is unavailable for 
all partitions.
+     */
+    public Map<Integer, Set<String>> partitionRacks() {
+        return this.partitionRacks;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+
+        TopicMetadata that = (TopicMetadata) o;
+
+        if (!id.equals(that.id)) return false;
+        if (!name.equals(that.name)) return false;
+        if (numPartitions != that.numPartitions) return false;
+        return partitionRacks.equals(that.partitionRacks);
+    }
+
+    @Override
+    public int hashCode() {
+        int result = id.hashCode();
+        result = 31 * result + name.hashCode();
+        result = 31 * result + numPartitions;
+        result = 31 * result + partitionRacks.hashCode();
+        return result;
+    }
+
+    @Override
+    public String toString() {
+        return "TopicMetadata(" +
+            "id=" + id +
+            ", name=" + name +
+            ", numPartitions=" + numPartitions +
+            ", partitionRacks=" + partitionRacks +
+            ')';
+    }
+
+    public static TopicMetadata fromRecord(
+        StreamsGroupPartitionMetadataValue.TopicMetadata record
+    ) {
+        // Converting the data type from a list stored in the record to a map 
for the topic metadata.
+        Map<Integer, Set<String>> partitionRacks = new HashMap<>();
+        for (StreamsGroupPartitionMetadataValue.PartitionMetadata 
partitionMetadata : record.partitionMetadata()) {
+            partitionRacks.put(
+                partitionMetadata.partition(),
+                Collections.unmodifiableSet(new 
HashSet<>(partitionMetadata.racks()))
+            );
+        }
+
+        return new TopicMetadata(
+            record.topicId(),
+            record.topicName(),
+            record.numPartitions(),
+            partitionRacks);
+    }
+}
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/AssignmentTestUtil.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/AssignmentTestUtil.java
index 489d0d1fdd0..9759f41d727 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/AssignmentTestUtil.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/AssignmentTestUtil.java
@@ -71,6 +71,25 @@ public class AssignmentTestUtil {
         return Collections.unmodifiableMap(assignment);
     }
 
+    @SafeVarargs
+    public static Map<String, Set<Integer>> 
mkStreamsAssignment(Map.Entry<String, Set<Integer>>... entries) {
+        Map<String, Set<Integer>> assignment = new HashMap<>();
+        for (Map.Entry<String, Set<Integer>> entry : entries) {
+            assignment.put(entry.getKey(), entry.getValue());
+        }
+        return assignment;
+    }
+
+    public static Map.Entry<String, Set<Integer>> mkTaskAssignment(
+        String subtopology,
+        Integer... partitions
+    ) {
+        return new AbstractMap.SimpleEntry<>(
+            subtopology,
+            new HashSet<>(Arrays.asList(partitions))
+        );
+    }
+
     /**
      * Verifies that the expected assignment is equal to the computed 
assignment for every member in the group.
      */
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/AssignmentTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/AssignmentTest.java
new file mode 100644
index 00000000000..baff38bceaf
--- /dev/null
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/AssignmentTest.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.streams;
+
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMemberValue;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static 
org.apache.kafka.coordinator.group.AssignmentTestUtil.mkStreamsAssignment;
+import static 
org.apache.kafka.coordinator.group.AssignmentTestUtil.mkTaskAssignment;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+public class AssignmentTest {
+
+    @Test
+    public void testTasksCannotBeNull() {
+        assertThrows(NullPointerException.class, () -> new Assignment(null, 
Collections.emptyMap(), Collections.emptyMap()));
+        assertThrows(NullPointerException.class, () -> new 
Assignment(Collections.emptyMap(), null, Collections.emptyMap()));
+        assertThrows(NullPointerException.class, () -> new 
Assignment(Collections.emptyMap(), Collections.emptyMap(), null));
+    }
+
+    @Test
+    public void testAttributes() {
+        Map<String, Set<Integer>> activeTasks = mkStreamsAssignment(
+            mkTaskAssignment("subtopology1", 1, 2, 3)
+        );
+        Map<String, Set<Integer>> standbyTasks = mkStreamsAssignment(
+            mkTaskAssignment("subtopology2", 9, 8, 7)
+        );
+        Map<String, Set<Integer>> warmupTasks = mkStreamsAssignment(
+            mkTaskAssignment("subtopology3", 4, 5, 6)
+        );
+        Assignment assignment = new Assignment(activeTasks, standbyTasks, 
warmupTasks);
+
+        assertEquals(activeTasks, assignment.activeTasks());
+        assertEquals(standbyTasks, assignment.standbyTasks());
+        assertEquals(warmupTasks, assignment.warmupTasks());
+    }
+
+    @Test
+    public void testFromTargetAssignmentRecord() {
+        String subtopology1 = "subtopology1";
+        String subtopology2 = "subtopology2";
+        List<StreamsGroupTargetAssignmentMemberValue.TaskId> activeTasks = new 
ArrayList<>();
+        activeTasks.add(new StreamsGroupTargetAssignmentMemberValue.TaskId()
+            .setSubtopology(subtopology1)
+            .setPartitions(Arrays.asList(1, 2, 3)));
+        activeTasks.add(new StreamsGroupTargetAssignmentMemberValue.TaskId()
+            .setSubtopology(subtopology2)
+            .setPartitions(Arrays.asList(4, 5, 6)));
+        List<StreamsGroupTargetAssignmentMemberValue.TaskId> standbyTasks = 
new ArrayList<>();
+        standbyTasks.add(new StreamsGroupTargetAssignmentMemberValue.TaskId()
+            .setSubtopology(subtopology1)
+            .setPartitions(Arrays.asList(7, 8, 9)));
+        standbyTasks.add(new StreamsGroupTargetAssignmentMemberValue.TaskId()
+            .setSubtopology(subtopology2)
+            .setPartitions(Arrays.asList(1, 2, 3)));
+        List<StreamsGroupTargetAssignmentMemberValue.TaskId> warmupTasks = new 
ArrayList<>();
+        warmupTasks.add(new StreamsGroupTargetAssignmentMemberValue.TaskId()
+            .setSubtopology(subtopology1)
+            .setPartitions(Arrays.asList(4, 5, 6)));
+        warmupTasks.add(new StreamsGroupTargetAssignmentMemberValue.TaskId()
+            .setSubtopology(subtopology2)
+            .setPartitions(Arrays.asList(7, 8, 9)));
+
+        StreamsGroupTargetAssignmentMemberValue record = new 
StreamsGroupTargetAssignmentMemberValue()
+            .setActiveTasks(activeTasks)
+            .setStandbyTasks(standbyTasks)
+            .setWarmupTasks(warmupTasks);
+
+        Assignment assignment = Assignment.fromRecord(record);
+
+        assertEquals(
+            mkStreamsAssignment(
+                mkTaskAssignment(subtopology1, 1, 2, 3),
+                mkTaskAssignment(subtopology2, 4, 5, 6)
+            ),
+            assignment.activeTasks()
+        );
+        assertEquals(
+            mkStreamsAssignment(
+                mkTaskAssignment(subtopology1, 7, 8, 9),
+                mkTaskAssignment(subtopology2, 1, 2, 3)
+            ),
+            assignment.standbyTasks()
+        );
+        assertEquals(
+            mkStreamsAssignment(
+                mkTaskAssignment(subtopology1, 4, 5, 6),
+                mkTaskAssignment(subtopology2, 7, 8, 9)
+            ),
+            assignment.warmupTasks()
+        );
+    }
+
+    @Test
+    public void testEquals() {
+        Map<String, Set<Integer>> activeTasks = mkStreamsAssignment(
+            mkTaskAssignment("subtopology1", 1, 2, 3)
+        );
+        Map<String, Set<Integer>> standbyTasks = mkStreamsAssignment(
+            mkTaskAssignment("subtopology2", 9, 8, 7)
+        );
+        Map<String, Set<Integer>> warmupTasks = mkStreamsAssignment(
+            mkTaskAssignment("subtopology3", 4, 5, 6)
+        );
+
+        assertEquals(
+            new Assignment(activeTasks, standbyTasks, warmupTasks),
+            new Assignment(activeTasks, standbyTasks, warmupTasks)
+        );
+    }
+}

Reply via email to