This is an automated email from the ASF dual-hosted git repository. cadonna pushed a commit to branch kip1071 in repository https://gitbox.apache.org/repos/asf/kafka.git
commit 60f6773fa631a4c6e2700e858c06584f31b32511 Author: Bruno Cadonna <[email protected]> AuthorDate: Fri Jun 7 14:31:36 2024 +0200 Add Assignment class in group coordinator for Streams See https://github.com/lucasbru/kafka/pull/17 --- .../coordinator/group/streams/Assignment.java | 124 +++++++++++++++ .../coordinator/group/streams/MemberState.java | 77 +++++++++ .../kafka/coordinator/group/streams/TopicIds.java | 176 +++++++++++++++++++++ .../coordinator/group/streams/TopicMetadata.java | 154 ++++++++++++++++++ .../coordinator/group/AssignmentTestUtil.java | 19 +++ .../coordinator/group/streams/AssignmentTest.java | 134 ++++++++++++++++ 6 files changed, 684 insertions(+) diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/Assignment.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/Assignment.java new file mode 100644 index 00000000000..735ee414785 --- /dev/null +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/Assignment.java @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.streams; + +import org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMemberValue; + +import java.util.Collections; +import java.util.HashSet; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * An immutable assignment for a member. + */ +public class Assignment { + public static final Assignment EMPTY = new Assignment( + Collections.emptyMap(), + Collections.emptyMap(), + Collections.emptyMap() + ); + + private final Map<String, Set<Integer>> activeTasks; + private final Map<String, Set<Integer>> standbyTasks; + private final Map<String, Set<Integer>> warmupTasks; + + public Assignment(final Map<String, Set<Integer>> activeTasks, + final Map<String, Set<Integer>> standbyTasks, + final Map<String, Set<Integer>> warmupTasks) { + this.activeTasks = Collections.unmodifiableMap(Objects.requireNonNull(activeTasks)); + this.standbyTasks = Collections.unmodifiableMap(Objects.requireNonNull(standbyTasks)); + this.warmupTasks = Collections.unmodifiableMap(Objects.requireNonNull(warmupTasks)); + } + + /** + * @return The assigned active tasks. + */ + public Map<String, Set<Integer>> activeTasks() { + return activeTasks; + } + + /** + * @return The assigned standby tasks. + */ + public Map<String, Set<Integer>> standbyTasks() { + return standbyTasks; + } + + /** + * @return The assigned warm-up tasks. + */ + public Map<String, Set<Integer>> warmupTasks() { + return warmupTasks; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + Assignment that = (Assignment) o; + return Objects.equals(activeTasks, that.activeTasks) + && Objects.equals(standbyTasks, that.standbyTasks) + && Objects.equals(warmupTasks, that.warmupTasks); + } + + @Override + public int hashCode() { + return Objects.hash(activeTasks, standbyTasks, warmupTasks); + } + + @Override + public String toString() { + return "Assignment(active tasks=" + activeTasks + + ", standby tasks=" + standbyTasks + + ", warm-up tasks=" + warmupTasks +')'; + } + + /** + * Creates a {{@link org.apache.kafka.coordinator.group.streams.Assignment}} from a + * {{@link org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMemberValue}}. + * + * @param record The record. + * @return A {{@link org.apache.kafka.coordinator.group.streams.Assignment}}. + */ + public static Assignment fromRecord( + StreamsGroupTargetAssignmentMemberValue record + ) { + return new Assignment( + record.activeTasks().stream() + .collect(Collectors.toMap( + StreamsGroupTargetAssignmentMemberValue.TaskId::subtopology, + taskId -> new HashSet<>(taskId.partitions()) + ) + ), + record.standbyTasks().stream() + .collect(Collectors.toMap( + StreamsGroupTargetAssignmentMemberValue.TaskId::subtopology, + taskId -> new HashSet<>(taskId.partitions()) + ) + ), + record.warmupTasks().stream() + .collect(Collectors.toMap( + StreamsGroupTargetAssignmentMemberValue.TaskId::subtopology, + taskId -> new HashSet<>(taskId.partitions()) + ) + ) + ); + } +} diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/MemberState.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/MemberState.java new file mode 100644 index 00000000000..53518e61934 --- /dev/null +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/MemberState.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.streams; + +import java.util.HashMap; +import java.util.Map; + +/** + * The various states that a member can be in. For their definition, + * refer to the documentation of {{@link org.apache.kafka.coordinator.group.streams.CurrentAssignmentBuilder}}. + */ +public enum MemberState { + + /** + * The member is fully reconciled with the desired target assignment. + */ + STABLE((byte) 1), + + /** + * The member must revoke some tasks in order to be able to + * transition to the next epoch. + */ + UNREVOKED_TASKS((byte) 2), + + /** + * The member transitioned to the last epoch but waits on some + * tasks which have not been revoked by their previous + * owners yet. + */ + UNRELEASED_TASKS((byte) 3), + + /** + * The member is in an unknown state. This can only happen if a future + * version of the software introduces a new state unknown by this version. + */ + UNKNOWN((byte) 127); + + private final static Map<Byte, MemberState> VALUES_TO_ENUMS = new HashMap<>(); + + static { + for (MemberState state: MemberState.values()) { + VALUES_TO_ENUMS.put(state.value(), state); + } + } + + private final byte value; + + MemberState(byte value) { + this.value = value; + } + + public byte value() { + return value; + } + + public static MemberState fromValue(byte value) { + MemberState state = VALUES_TO_ENUMS.get(value); + if (state == null) { + return UNKNOWN; + } + return state; + } +} diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/TopicIds.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/TopicIds.java new file mode 100644 index 00000000000..7beb9b9e4ff --- /dev/null +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/TopicIds.java @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.streams; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.image.TopicImage; +import org.apache.kafka.image.TopicsImage; + +import java.util.Collection; +import java.util.Iterator; +import java.util.NoSuchElementException; +import java.util.Objects; +import java.util.Set; + +/** + * TopicIds is initialized with topic names (String) but exposes a Set of topic ids (Uuid) to the + * user and performs the conversion lazily with TopicsImage. + */ +public class TopicIds implements Set<Uuid> { + private final Set<String> topicNames; + private final TopicsImage image; + + public TopicIds( + Set<String> topicNames, + TopicsImage image + ) { + this.topicNames = Objects.requireNonNull(topicNames); + this.image = Objects.requireNonNull(image); + } + + @Override + public int size() { + return topicNames.size(); + } + + @Override + public boolean isEmpty() { + return topicNames.isEmpty(); + } + + @Override + public boolean contains(Object o) { + if (o instanceof Uuid) { + Uuid topicId = (Uuid) o; + TopicImage topicImage = image.getTopic(topicId); + if (topicImage == null) return false; + return topicNames.contains(topicImage.name()); + } + return false; + } + + private static class TopicIdIterator implements Iterator<Uuid> { + final Iterator<String> iterator; + final TopicsImage image; + private Uuid next = null; + + private TopicIdIterator( + Iterator<String> iterator, + TopicsImage image + ) { + this.iterator = Objects.requireNonNull(iterator); + this.image = Objects.requireNonNull(image); + } + + @Override + public boolean hasNext() { + if (next != null) return true; + Uuid result = null; + do { + if (!iterator.hasNext()) { + return false; + } + String next = iterator.next(); + TopicImage topicImage = image.getTopic(next); + if (topicImage != null) { + result = topicImage.id(); + } + } while (result == null); + next = result; + return true; + } + + @Override + public Uuid next() { + if (!hasNext()) throw new NoSuchElementException(); + Uuid result = next; + next = null; + return result; + } + } + + @Override + public Iterator<Uuid> iterator() { + return new TopicIdIterator(topicNames.iterator(), image); + } + + @Override + public Object[] toArray() { + throw new UnsupportedOperationException(); + } + + @Override + public <T> T[] toArray(T[] a) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean add(Uuid o) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean remove(Object o) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean addAll(Collection c) { + throw new UnsupportedOperationException(); + } + + @Override + public void clear() { + throw new UnsupportedOperationException(); + } + + @Override + public boolean removeAll(Collection c) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean retainAll(Collection c) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean containsAll(Collection c) { + for (Object o : c) { + if (!contains(o)) return false; + } + return true; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + TopicIds uuids = (TopicIds) o; + + if (!Objects.equals(topicNames, uuids.topicNames)) return false; + return Objects.equals(image, uuids.image); + } + + @Override + public int hashCode() { + int result = topicNames.hashCode(); + result = 31 * result + image.hashCode(); + return result; + } +} diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/TopicMetadata.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/TopicMetadata.java new file mode 100644 index 00000000000..b03afea1be1 --- /dev/null +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/TopicMetadata.java @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.streams; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.coordinator.group.generated.StreamsGroupPartitionMetadataValue; + +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Objects; +import java.util.Set; + +/** + * Immutable topic metadata. + */ +public class TopicMetadata { + /** + * The topic id. + */ + private final Uuid id; + + /** + * The topic name. + */ + private final String name; + + /** + * The number of partitions. + */ + private final int numPartitions; + + /** + * Map of every partition Id to a set of its rack Ids, if they exist. + * If rack information is unavailable for all partitions, this is an empty map. + */ + private final Map<Integer, Set<String>> partitionRacks; + + public TopicMetadata( + Uuid id, + String name, + int numPartitions, + Map<Integer, Set<String>> partitionRacks + ) { + this.id = Objects.requireNonNull(id); + if (Uuid.ZERO_UUID.equals(id)) { + throw new IllegalArgumentException("Topic id cannot be ZERO_UUID."); + } + this.name = Objects.requireNonNull(name); + if (name.isEmpty()) { + throw new IllegalArgumentException("Topic name cannot be empty."); + } + this.numPartitions = numPartitions; + if (numPartitions < 0) { + throw new IllegalArgumentException("Number of partitions cannot be negative."); + } + this.partitionRacks = Objects.requireNonNull(partitionRacks); + } + + /** + * @return The topic id. + */ + public Uuid id() { + return this.id; + } + + /** + * @return The topic name. + */ + public String name() { + return this.name; + } + + /** + * @return The number of partitions. + */ + public int numPartitions() { + return this.numPartitions; + } + + /** + * @return Every partition mapped to the set of corresponding available rack Ids of its replicas. + * An empty map is returned if rack information is unavailable for all partitions. + */ + public Map<Integer, Set<String>> partitionRacks() { + return this.partitionRacks; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + TopicMetadata that = (TopicMetadata) o; + + if (!id.equals(that.id)) return false; + if (!name.equals(that.name)) return false; + if (numPartitions != that.numPartitions) return false; + return partitionRacks.equals(that.partitionRacks); + } + + @Override + public int hashCode() { + int result = id.hashCode(); + result = 31 * result + name.hashCode(); + result = 31 * result + numPartitions; + result = 31 * result + partitionRacks.hashCode(); + return result; + } + + @Override + public String toString() { + return "TopicMetadata(" + + "id=" + id + + ", name=" + name + + ", numPartitions=" + numPartitions + + ", partitionRacks=" + partitionRacks + + ')'; + } + + public static TopicMetadata fromRecord( + StreamsGroupPartitionMetadataValue.TopicMetadata record + ) { + // Converting the data type from a list stored in the record to a map for the topic metadata. + Map<Integer, Set<String>> partitionRacks = new HashMap<>(); + for (StreamsGroupPartitionMetadataValue.PartitionMetadata partitionMetadata : record.partitionMetadata()) { + partitionRacks.put( + partitionMetadata.partition(), + Collections.unmodifiableSet(new HashSet<>(partitionMetadata.racks())) + ); + } + + return new TopicMetadata( + record.topicId(), + record.topicName(), + record.numPartitions(), + partitionRacks); + } +} diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/AssignmentTestUtil.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/AssignmentTestUtil.java index 489d0d1fdd0..9759f41d727 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/AssignmentTestUtil.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/AssignmentTestUtil.java @@ -71,6 +71,25 @@ public class AssignmentTestUtil { return Collections.unmodifiableMap(assignment); } + @SafeVarargs + public static Map<String, Set<Integer>> mkStreamsAssignment(Map.Entry<String, Set<Integer>>... entries) { + Map<String, Set<Integer>> assignment = new HashMap<>(); + for (Map.Entry<String, Set<Integer>> entry : entries) { + assignment.put(entry.getKey(), entry.getValue()); + } + return assignment; + } + + public static Map.Entry<String, Set<Integer>> mkTaskAssignment( + String subtopology, + Integer... partitions + ) { + return new AbstractMap.SimpleEntry<>( + subtopology, + new HashSet<>(Arrays.asList(partitions)) + ); + } + /** * Verifies that the expected assignment is equal to the computed assignment for every member in the group. */ diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/AssignmentTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/AssignmentTest.java new file mode 100644 index 00000000000..baff38bceaf --- /dev/null +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/AssignmentTest.java @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.streams; + +import org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMemberValue; +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkStreamsAssignment; +import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkTaskAssignment; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + +public class AssignmentTest { + + @Test + public void testTasksCannotBeNull() { + assertThrows(NullPointerException.class, () -> new Assignment(null, Collections.emptyMap(), Collections.emptyMap())); + assertThrows(NullPointerException.class, () -> new Assignment(Collections.emptyMap(), null, Collections.emptyMap())); + assertThrows(NullPointerException.class, () -> new Assignment(Collections.emptyMap(), Collections.emptyMap(), null)); + } + + @Test + public void testAttributes() { + Map<String, Set<Integer>> activeTasks = mkStreamsAssignment( + mkTaskAssignment("subtopology1", 1, 2, 3) + ); + Map<String, Set<Integer>> standbyTasks = mkStreamsAssignment( + mkTaskAssignment("subtopology2", 9, 8, 7) + ); + Map<String, Set<Integer>> warmupTasks = mkStreamsAssignment( + mkTaskAssignment("subtopology3", 4, 5, 6) + ); + Assignment assignment = new Assignment(activeTasks, standbyTasks, warmupTasks); + + assertEquals(activeTasks, assignment.activeTasks()); + assertEquals(standbyTasks, assignment.standbyTasks()); + assertEquals(warmupTasks, assignment.warmupTasks()); + } + + @Test + public void testFromTargetAssignmentRecord() { + String subtopology1 = "subtopology1"; + String subtopology2 = "subtopology2"; + List<StreamsGroupTargetAssignmentMemberValue.TaskId> activeTasks = new ArrayList<>(); + activeTasks.add(new StreamsGroupTargetAssignmentMemberValue.TaskId() + .setSubtopology(subtopology1) + .setPartitions(Arrays.asList(1, 2, 3))); + activeTasks.add(new StreamsGroupTargetAssignmentMemberValue.TaskId() + .setSubtopology(subtopology2) + .setPartitions(Arrays.asList(4, 5, 6))); + List<StreamsGroupTargetAssignmentMemberValue.TaskId> standbyTasks = new ArrayList<>(); + standbyTasks.add(new StreamsGroupTargetAssignmentMemberValue.TaskId() + .setSubtopology(subtopology1) + .setPartitions(Arrays.asList(7, 8, 9))); + standbyTasks.add(new StreamsGroupTargetAssignmentMemberValue.TaskId() + .setSubtopology(subtopology2) + .setPartitions(Arrays.asList(1, 2, 3))); + List<StreamsGroupTargetAssignmentMemberValue.TaskId> warmupTasks = new ArrayList<>(); + warmupTasks.add(new StreamsGroupTargetAssignmentMemberValue.TaskId() + .setSubtopology(subtopology1) + .setPartitions(Arrays.asList(4, 5, 6))); + warmupTasks.add(new StreamsGroupTargetAssignmentMemberValue.TaskId() + .setSubtopology(subtopology2) + .setPartitions(Arrays.asList(7, 8, 9))); + + StreamsGroupTargetAssignmentMemberValue record = new StreamsGroupTargetAssignmentMemberValue() + .setActiveTasks(activeTasks) + .setStandbyTasks(standbyTasks) + .setWarmupTasks(warmupTasks); + + Assignment assignment = Assignment.fromRecord(record); + + assertEquals( + mkStreamsAssignment( + mkTaskAssignment(subtopology1, 1, 2, 3), + mkTaskAssignment(subtopology2, 4, 5, 6) + ), + assignment.activeTasks() + ); + assertEquals( + mkStreamsAssignment( + mkTaskAssignment(subtopology1, 7, 8, 9), + mkTaskAssignment(subtopology2, 1, 2, 3) + ), + assignment.standbyTasks() + ); + assertEquals( + mkStreamsAssignment( + mkTaskAssignment(subtopology1, 4, 5, 6), + mkTaskAssignment(subtopology2, 7, 8, 9) + ), + assignment.warmupTasks() + ); + } + + @Test + public void testEquals() { + Map<String, Set<Integer>> activeTasks = mkStreamsAssignment( + mkTaskAssignment("subtopology1", 1, 2, 3) + ); + Map<String, Set<Integer>> standbyTasks = mkStreamsAssignment( + mkTaskAssignment("subtopology2", 9, 8, 7) + ); + Map<String, Set<Integer>> warmupTasks = mkStreamsAssignment( + mkTaskAssignment("subtopology3", 4, 5, 6) + ); + + assertEquals( + new Assignment(activeTasks, standbyTasks, warmupTasks), + new Assignment(activeTasks, standbyTasks, warmupTasks) + ); + } +}
