This is an automated email from the ASF dual-hosted git repository.
lucasbru pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 4ca24a7dbf8 KAFKA-18325: Add TargetAssignmentBuilder (#18676)
4ca24a7dbf8 is described below
commit 4ca24a7dbf84b83b6693442d416a111ba8e53caf
Author: Lucas Brutschy <[email protected]>
AuthorDate: Mon Feb 3 17:35:28 2025 +0100
KAFKA-18325: Add TargetAssignmentBuilder (#18676)
A class to build a new target assignment based on the provided parameters.
As a result, it yields the records that must be persisted to the log and the
new member assignments as a map.
Compared to the feature branch, I extended the unit tests (testing also
standby and warm-up task logic) and adopted simplifications due to the
TasksTuple class.
Reviewers: Bruno Cadonna <[email protected]>, Bill Bejeck
<[email protected]>
---
.../group/streams/TargetAssignmentBuilder.java | 351 +++++++++
.../group/streams/TopologyMetadata.java | 105 +++
.../group/streams/assignor/MockAssignor.java | 2 +-
.../group/streams/assignor/StickyTaskAssignor.java | 4 +-
.../group/streams/assignor/TopologyDescriber.java | 18 +-
.../group/streams/topics/ConfiguredTopology.java | 3 +-
.../group/streams/topics/InternalTopicManager.java | 12 +-
.../group/streams/TargetAssignmentBuilderTest.java | 852 +++++++++++++++++++++
.../group/streams/TopologyMetadataTest.java | 126 +++
.../group/streams/assignor/MockAssignorTest.java | 2 +-
.../streams/assignor/StickyTaskAssignorTest.java | 4 +-
.../streams/topics/ConfiguredTopologyTest.java | 10 +-
12 files changed, 1468 insertions(+), 21 deletions(-)
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/TargetAssignmentBuilder.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/TargetAssignmentBuilder.java
new file mode 100644
index 00000000000..4c1adeec839
--- /dev/null
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/TargetAssignmentBuilder.java
@@ -0,0 +1,351 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.streams;
+
+import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord;
+import
org.apache.kafka.coordinator.group.streams.assignor.AssignmentMemberSpec;
+import org.apache.kafka.coordinator.group.streams.assignor.GroupAssignment;
+import org.apache.kafka.coordinator.group.streams.assignor.GroupSpecImpl;
+import org.apache.kafka.coordinator.group.streams.assignor.MemberAssignment;
+import org.apache.kafka.coordinator.group.streams.assignor.TaskAssignor;
+import
org.apache.kafka.coordinator.group.streams.assignor.TaskAssignorException;
+import org.apache.kafka.coordinator.group.streams.topics.ConfiguredTopology;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+/**
+ * Build the new target member assignments based on the provided parameters by
calling the task assignor.
+ * As a result,
+ * it yields the records that must be persisted to the log and the new member
assignments as a map from member ID to tasks tuple.
+ * <p>
+ * Records are only created for members which have a new target assignment. If
their assignment did not change, no new record is needed.
+ * <p>
+ * When a member is deleted, it is assumed that its target assignment record
is deleted as part of the member deletion process. In other
+ * words, this class does not yield a tombstone for removed members.
+ */
+public class TargetAssignmentBuilder {
+
+ /**
+ * The group ID.
+ */
+ private final String groupId;
+ /**
+ * The group epoch.
+ */
+ private final int groupEpoch;
+
+ /**
+ * The partition assignor used to compute the assignment.
+ */
+ private final TaskAssignor assignor;
+
+ /**
+ * The assignment configs.
+ */
+ private final Map<String, String> assignmentConfigs;
+
+ /**
+ * The members which have been updated or deleted. A null value signals
deleted members.
+ */
+ private final Map<String, StreamsGroupMember> updatedMembers = new
HashMap<>();
+
+ /**
+ * The members in the group.
+ */
+ private Map<String, StreamsGroupMember> members = Map.of();
+
+ /**
+ * The partition metadata.
+ */
+ private Map<String,
org.apache.kafka.coordinator.group.streams.TopicMetadata> partitionMetadata =
Map.of();
+
+ /**
+ * The existing target assignment.
+ */
+ private Map<String, org.apache.kafka.coordinator.group.streams.TasksTuple>
targetAssignment = Map.of();
+
+ /**
+ * The topology.
+ */
+ private ConfiguredTopology topology;
+
+ /**
+ * The static members in the group.
+ */
+ private Map<String, String> staticMembers = Map.of();
+
+ /**
+ * Constructs the object.
+ *
+ * @param groupId The group ID.
+ * @param groupEpoch The group epoch to compute a target assignment for.
+ * @param assignor The assignor to use to compute the target assignment.
+ */
+ public TargetAssignmentBuilder(
+ String groupId,
+ int groupEpoch,
+ TaskAssignor assignor,
+ Map<String, String> assignmentConfigs
+ ) {
+ this.groupId = Objects.requireNonNull(groupId);
+ this.groupEpoch = groupEpoch;
+ this.assignor = Objects.requireNonNull(assignor);
+ this.assignmentConfigs = Objects.requireNonNull(assignmentConfigs);
+ }
+
+ static AssignmentMemberSpec createAssignmentMemberSpec(
+ StreamsGroupMember member,
+ TasksTuple targetAssignment
+ ) {
+ return new AssignmentMemberSpec(
+ member.instanceId(),
+ member.rackId(),
+ targetAssignment.activeTasks(),
+ targetAssignment.standbyTasks(),
+ targetAssignment.warmupTasks(),
+ member.processId(),
+ member.clientTags(),
+ Map.of(),
+ Map.of()
+ );
+ }
+
+ /**
+ * Adds all the existing members.
+ *
+ * @param members The existing members in the streams group.
+ * @return This object.
+ */
+ public TargetAssignmentBuilder withMembers(
+ Map<String, StreamsGroupMember> members
+ ) {
+ this.members = members;
+ return this;
+ }
+
+ /**
+ * Adds all the existing static members.
+ *
+ * @param staticMembers The existing static members in the streams group.
+ * @return This object.
+ */
+ public TargetAssignmentBuilder withStaticMembers(
+ Map<String, String> staticMembers
+ ) {
+ this.staticMembers = staticMembers;
+ return this;
+ }
+
+ /**
+ * Adds the partition metadata to use.
+ *
+ * @param partitionMetadata The partition metadata.
+ * @return This object.
+ */
+ public TargetAssignmentBuilder withPartitionMetadata(
+ Map<String, org.apache.kafka.coordinator.group.streams.TopicMetadata>
partitionMetadata
+ ) {
+ this.partitionMetadata = partitionMetadata;
+ return this;
+ }
+
+ /**
+ * Adds the existing target assignment.
+ *
+ * @param targetAssignment The existing target assignment.
+ * @return This object.
+ */
+ public TargetAssignmentBuilder withTargetAssignment(
+ Map<String, org.apache.kafka.coordinator.group.streams.TasksTuple>
targetAssignment
+ ) {
+ this.targetAssignment = targetAssignment;
+ return this;
+ }
+
+ /**
+ * Adds the topology image.
+ *
+ * @param topology The topology.
+ * @return This object.
+ */
+ public TargetAssignmentBuilder withTopology(
+ ConfiguredTopology topology
+ ) {
+ this.topology = topology;
+ return this;
+ }
+
+
+ /**
+ * Adds or updates a member. This is useful when the updated member is not
yet materialized in memory.
+ *
+ * @param memberId The member ID.
+ * @param member The member to add or update.
+ * @return This object.
+ */
+ public TargetAssignmentBuilder addOrUpdateMember(
+ String memberId,
+ StreamsGroupMember member
+ ) {
+ this.updatedMembers.put(memberId, member);
+ return this;
+ }
+
+ /**
+ * Removes a member. This is useful when the removed member is not yet
materialized in memory.
+ *
+ * @param memberId The member ID.
+ * @return This object.
+ */
+ public TargetAssignmentBuilder removeMember(
+ String memberId
+ ) {
+ return addOrUpdateMember(memberId, null);
+ }
+
+ /**
+ * Builds the new target assignment.
+ *
+ * @return A TargetAssignmentResult which contains the records to update
the existing target assignment.
+ * @throws TaskAssignorException if the target assignment cannot be
computed.
+ */
+ public TargetAssignmentResult build() throws TaskAssignorException {
+ Map<String, AssignmentMemberSpec> memberSpecs = new HashMap<>();
+
+ // Prepare the member spec for all members.
+ members.forEach((memberId, member) -> memberSpecs.put(memberId,
createAssignmentMemberSpec(
+ member,
+ targetAssignment.getOrDefault(memberId,
org.apache.kafka.coordinator.group.streams.TasksTuple.EMPTY)
+ )));
+
+ // Update the member spec if updated or deleted members.
+ updatedMembers.forEach((memberId, updatedMemberOrNull) -> {
+ if (updatedMemberOrNull == null) {
+ memberSpecs.remove(memberId);
+ } else {
+ org.apache.kafka.coordinator.group.streams.TasksTuple
assignment = targetAssignment.getOrDefault(memberId,
+
org.apache.kafka.coordinator.group.streams.TasksTuple.EMPTY);
+
+ // A new static member joins and needs to replace an existing
departed one.
+ if (updatedMemberOrNull.instanceId().isPresent()) {
+ String previousMemberId =
staticMembers.get(updatedMemberOrNull.instanceId().get());
+ if (previousMemberId != null &&
!previousMemberId.equals(memberId)) {
+ assignment =
targetAssignment.getOrDefault(previousMemberId,
+
org.apache.kafka.coordinator.group.streams.TasksTuple.EMPTY);
+ }
+ }
+
+ memberSpecs.put(memberId, createAssignmentMemberSpec(
+ updatedMemberOrNull,
+ assignment
+ ));
+ }
+ });
+
+ // Compute the assignment.
+ GroupAssignment newGroupAssignment;
+ if (topology.isReady()) {
+ if (topology.subtopologies().isEmpty()) {
+ throw new IllegalStateException("Subtopologies must be present
if topology is ready.");
+ }
+ newGroupAssignment = assignor.assign(
+ new GroupSpecImpl(
+ Collections.unmodifiableMap(memberSpecs),
+ assignmentConfigs
+ ),
+ new TopologyMetadata(partitionMetadata,
topology.subtopologies().get())
+ );
+ } else {
+ newGroupAssignment = new GroupAssignment(
+ memberSpecs.keySet().stream().collect(Collectors.toMap(x -> x,
x -> MemberAssignment.empty())));
+ }
+
+ // Compute delta from previous to new target assignment and create the
+ // relevant records.
+ List<CoordinatorRecord> records = new ArrayList<>();
+ Map<String, org.apache.kafka.coordinator.group.streams.TasksTuple>
newTargetAssignment = new HashMap<>();
+
+ memberSpecs.keySet().forEach(memberId -> {
+ org.apache.kafka.coordinator.group.streams.TasksTuple
oldMemberAssignment = targetAssignment.get(memberId);
+ org.apache.kafka.coordinator.group.streams.TasksTuple
newMemberAssignment = newMemberAssignment(newGroupAssignment, memberId);
+
+ newTargetAssignment.put(memberId, newMemberAssignment);
+
+ if (oldMemberAssignment == null) {
+ // If the member had no assignment, we always create a record
for it.
+
records.add(StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(
+ groupId,
+ memberId,
+ newMemberAssignment
+ ));
+ } else {
+ // If the member had an assignment, we only create a record if
the
+ // new assignment is different.
+ if (!newMemberAssignment.equals(oldMemberAssignment)) {
+
records.add(StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(
+ groupId,
+ memberId,
+ newMemberAssignment
+ ));
+ }
+ }
+ });
+
+ // Bump the target assignment epoch.
+
records.add(StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentEpochRecord(groupId,
groupEpoch));
+
+ return new TargetAssignmentResult(records, newTargetAssignment);
+ }
+
+ private TasksTuple newMemberAssignment(
+ GroupAssignment newGroupAssignment,
+ String memberId
+ ) {
+ MemberAssignment newMemberAssignment =
newGroupAssignment.members().get(memberId);
+ if (newMemberAssignment != null) {
+ return new TasksTuple(
+ newMemberAssignment.activeTasks(),
+ newMemberAssignment.standbyTasks(),
+ newMemberAssignment.warmupTasks()
+ );
+ } else {
+ return TasksTuple.EMPTY;
+ }
+ }
+
+ /**
+ * The assignment result returned by {{@link
TargetAssignmentBuilder#build()}}.
+ *
+ * @param records The records that must be applied to the
__consumer_offsets topics to persist the new target assignment.
+ * @param targetAssignment The new target assignment for the group.
+ */
+ public record TargetAssignmentResult(
+ List<CoordinatorRecord> records,
+ Map<String, TasksTuple> targetAssignment
+ ) {
+ public TargetAssignmentResult {
+ Objects.requireNonNull(records);
+ Objects.requireNonNull(targetAssignment);
+ }
+ }
+}
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/TopologyMetadata.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/TopologyMetadata.java
new file mode 100644
index 00000000000..d1119cfe011
--- /dev/null
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/TopologyMetadata.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.streams;
+
+import org.apache.kafka.coordinator.group.streams.assignor.TopologyDescriber;
+import org.apache.kafka.coordinator.group.streams.topics.ConfiguredSubtopology;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Objects;
+import java.util.SortedMap;
+import java.util.stream.Stream;
+
+/**
+ * The topology metadata class is used by the {@link
org.apache.kafka.coordinator.group.streams.assignor.TaskAssignor} to get topic
and
+ * partition metadata for the topology that the streams group using.
+ *
+ * @param topicMetadata The topic Ids mapped to their corresponding {@link
TopicMetadata} object, which contains topic and partition
+ * metadata.
+ * @param subtopologyMap The configured subtopologies
+ */
+public record TopologyMetadata(Map<String, TopicMetadata> topicMetadata,
SortedMap<String, ConfiguredSubtopology> subtopologyMap) implements
TopologyDescriber {
+
+ public TopologyMetadata {
+ topicMetadata =
Objects.requireNonNull(Collections.unmodifiableMap(topicMetadata));
+ subtopologyMap =
Objects.requireNonNull(Collections.unmodifiableSortedMap(subtopologyMap));
+ }
+
+ /**
+ * Map of topic names to topic metadata.
+ *
+ * @return The map of topic Ids to topic metadata.
+ */
+ @Override
+ public Map<String, TopicMetadata> topicMetadata() {
+ return this.topicMetadata;
+ }
+
+ /**
+ * Checks whether the given subtopology is associated with a changelog
topic.
+ *
+ * @param subtopologyId String identifying the subtopology.
+ * @throws NoSuchElementException if the subtopology ID does not exist.
+ * @return true if the subtopology is associated with a changelog topic,
false otherwise.
+ */
+ @Override
+ public boolean isStateful(String subtopologyId) {
+ final ConfiguredSubtopology subtopology =
getSubtopologyOrFail(subtopologyId);
+ return !subtopology.stateChangelogTopics().isEmpty();
+ }
+
+ /**
+ * The list of subtopologies in the topology.
+ *
+ * @return a list of subtopology IDs.
+ */
+ @Override
+ public List<String> subtopologies() {
+ return subtopologyMap.keySet().stream().toList();
+ }
+
+ /**
+ * The maximal number of input partitions among all source topics for the
given subtopology.
+ *
+ * @param subtopologyId String identifying the subtopology.
+ *
+ * @throws NoSuchElementException if the subtopology ID does not exist.
+ * @throws IllegalStateException if the subtopology contains no source
topics.
+ * @return The maximal number of input partitions among all source topics
for the given subtopology.
+ */
+ @Override
+ public int maxNumInputPartitions(String subtopologyId) {
+ final ConfiguredSubtopology subtopology =
getSubtopologyOrFail(subtopologyId);
+ return Stream.concat(
+ subtopology.sourceTopics().stream(),
+ subtopology.repartitionSourceTopics().keySet().stream()
+ ).map(topic ->
this.topicMetadata.get(topic).numPartitions()).max(Integer::compareTo).orElseThrow(
+ () -> new IllegalStateException("Subtopology does not contain any
source topics")
+ );
+ }
+
+ private ConfiguredSubtopology getSubtopologyOrFail(String subtopologyId) {
+ if (!subtopologyMap.containsKey(subtopologyId)) {
+ throw new NoSuchElementException(String.format("Topology does not
contain subtopology %s", subtopologyId));
+ }
+ return subtopologyMap.get(subtopologyId);
+ }
+
+}
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/MockAssignor.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/MockAssignor.java
index ce0bc101101..3e980bad93e 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/MockAssignor.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/MockAssignor.java
@@ -46,7 +46,7 @@ public class MockAssignor implements TaskAssignor {
Map<String, String[]> subtopologyToActiveMember = new HashMap<>();
for (String subtopology : topologyDescriber.subtopologies()) {
- int numberOfPartitions = topologyDescriber.numTasks(subtopology);
+ int numberOfPartitions =
topologyDescriber.maxNumInputPartitions(subtopology);
subtopologyToActiveMember.put(subtopology, new
String[numberOfPartitions]);
}
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/StickyTaskAssignor.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/StickyTaskAssignor.java
index 00ba701d3e2..c8d3c97d504 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/StickyTaskAssignor.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/StickyTaskAssignor.java
@@ -72,7 +72,7 @@ public class StickyTaskAssignor implements TaskAssignor {
Set<TaskId> ret = new HashSet<>();
for (String subtopology : topologyDescriber.subtopologies()) {
if (isActive || topologyDescriber.isStateful(subtopology)) {
- int numberOfPartitions =
topologyDescriber.numTasks(subtopology);
+ int numberOfPartitions =
topologyDescriber.maxNumInputPartitions(subtopology);
for (int i = 0; i < numberOfPartitions; i++) {
ret.add(new TaskId(subtopology, i));
}
@@ -85,7 +85,7 @@ public class StickyTaskAssignor implements TaskAssignor {
localState = new LocalState();
localState.allTasks = 0;
for (String subtopology : topologyDescriber.subtopologies()) {
- int numberOfPartitions = topologyDescriber.numTasks(subtopology);
+ int numberOfPartitions =
topologyDescriber.maxNumInputPartitions(subtopology);
localState.allTasks += numberOfPartitions;
}
localState.totalCapacity = groupSpec.members().size();
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/TopologyDescriber.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/TopologyDescriber.java
index 2f913bf5514..d6f7a3ab579 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/TopologyDescriber.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/TopologyDescriber.java
@@ -20,30 +20,34 @@ import java.util.List;
import java.util.NoSuchElementException;
/**
- * The subscribed topic describer is used by the {@link TaskAssignor} to
obtain topic and task metadata of the groups topology.
+ * The topology describer is used by the {@link TaskAssignor} to get topic and
task metadata of the group's topology.
*/
public interface TopologyDescriber {
/**
+ * Map of topic names to topic metadata.
+ *
* @return The list of subtopologies IDs.
*/
List<String> subtopologies();
/**
- * The number of tasks for the given subtopology.
+ * The maximal number of input partitions among all source topics for the
given subtopology.
*
* @param subtopologyId String identifying the subtopology.
*
- * @return The number of tasks corresponding to the given subtopology ID.
- * @throws NoSuchElementException if subtopology does not exist in the
topology.
+ * @throws NoSuchElementException if the subtopology ID does not exist.
+ * @throws IllegalStateException if the subtopology contains no source
topics.
+ * @return The maximal number of input partitions among all source topics
for the given subtopology.
*/
- int numTasks(String subtopologyId) throws NoSuchElementException;
+ int maxNumInputPartitions(String subtopologyId) throws
NoSuchElementException;
/**
- * Whether the given subtopology is stateful.
+ * Checks whether the given subtopology is associated with a changelog
topic.
*
* @param subtopologyId String identifying the subtopology.
- * @return true if the subtopology is stateful, false otherwise.
+ * @throws NoSuchElementException if the subtopology ID does not exist.
+ * @return true if the subtopology is associated with a changelog topic,
false otherwise.
*/
boolean isStateful(String subtopologyId);
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/ConfiguredTopology.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/ConfiguredTopology.java
index b6ccb87f7a2..6c7eede16fc 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/ConfiguredTopology.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/ConfiguredTopology.java
@@ -23,6 +23,7 @@ import java.util.Collections;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
+import java.util.SortedMap;
import java.util.stream.Collectors;
/**
@@ -41,7 +42,7 @@ import java.util.stream.Collectors;
* reported back to the client.
*/
public record ConfiguredTopology(int topologyEpoch,
- Optional<Map<String, ConfiguredSubtopology>>
subtopologies,
+ Optional<SortedMap<String,
ConfiguredSubtopology>> subtopologies,
Map<String, CreatableTopic>
internalTopicsToBeCreated,
Optional<TopicConfigurationException>
topicConfigurationException) {
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/InternalTopicManager.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/InternalTopicManager.java
index 31029d9fd9e..33d9f2c4874 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/InternalTopicManager.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/InternalTopicManager.java
@@ -34,6 +34,8 @@ import java.util.Map;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.Set;
+import java.util.SortedMap;
+import java.util.TreeMap;
import java.util.TreeSet;
import java.util.function.Function;
import java.util.stream.Collectors;
@@ -74,12 +76,16 @@ public class InternalTopicManager {
Map<String, Integer> decidedPartitionCountsForInternalTopics =
decidePartitionCounts(logContext, topology, topicMetadata,
copartitionGroupsBySubtopology, log);
- final Map<String, ConfiguredSubtopology> configuredSubtopologies =
+ final SortedMap<String, ConfiguredSubtopology>
configuredSubtopologies =
subtopologies.stream()
.collect(Collectors.toMap(
StreamsGroupTopologyValue.Subtopology::subtopologyId,
- x -> fromPersistedSubtopology(x,
decidedPartitionCountsForInternalTopics))
- );
+ x -> fromPersistedSubtopology(x,
decidedPartitionCountsForInternalTopics),
+ (v1, v2) -> {
+ throw new
RuntimeException(String.format("Duplicate key for values %s and %s", v1, v2));
+ },
+ TreeMap::new
+ ));
Map<String, CreatableTopic> internalTopicsToCreate =
missingInternalTopics(configuredSubtopologies, topicMetadata);
if (!internalTopicsToCreate.isEmpty()) {
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/TargetAssignmentBuilderTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/TargetAssignmentBuilderTest.java
new file mode 100644
index 00000000000..114974558b8
--- /dev/null
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/TargetAssignmentBuilderTest.java
@@ -0,0 +1,852 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.streams;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord;
+import org.apache.kafka.coordinator.group.MetadataImageBuilder;
+import
org.apache.kafka.coordinator.group.generated.StreamsGroupMemberMetadataValue;
+import
org.apache.kafka.coordinator.group.streams.TaskAssignmentTestUtil.TaskRole;
+import
org.apache.kafka.coordinator.group.streams.assignor.AssignmentMemberSpec;
+import org.apache.kafka.coordinator.group.streams.assignor.GroupAssignment;
+import org.apache.kafka.coordinator.group.streams.assignor.GroupSpecImpl;
+import org.apache.kafka.coordinator.group.streams.assignor.MemberAssignment;
+import org.apache.kafka.coordinator.group.streams.assignor.TaskAssignor;
+import org.apache.kafka.coordinator.group.streams.topics.ConfiguredSubtopology;
+import org.apache.kafka.coordinator.group.streams.topics.ConfiguredTopology;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.TreeMap;
+
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static
org.apache.kafka.coordinator.group.Assertions.assertUnorderedRecordsEquals;
+import static
org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpersTest.mkMapOfPartitionRacks;
+import static
org.apache.kafka.coordinator.group.streams.StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentEpochRecord;
+import static
org.apache.kafka.coordinator.group.streams.StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord;
+import static
org.apache.kafka.coordinator.group.streams.TargetAssignmentBuilder.createAssignmentMemberSpec;
+import static
org.apache.kafka.coordinator.group.streams.TaskAssignmentTestUtil.mkTasks;
+import static
org.apache.kafka.coordinator.group.streams.TaskAssignmentTestUtil.mkTasksTuple;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class TargetAssignmentBuilderTest {
+
+ @Test
+ public void testBuildEmptyAssignmentWhenTopologyNotReady() {
+ String groupId = "test-group";
+ int groupEpoch = 1;
+ TaskAssignor assignor = mock(TaskAssignor.class);
+ ConfiguredTopology topology = mock(ConfiguredTopology.class);
+ Map<String, String> assignmentConfigs = new HashMap<>();
+
+ when(topology.isReady()).thenReturn(false);
+
+ TargetAssignmentBuilder builder = new TargetAssignmentBuilder(groupId,
groupEpoch, assignor, assignmentConfigs)
+ .withTopology(topology);
+
+ TargetAssignmentBuilder.TargetAssignmentResult result =
builder.build();
+
+ List<CoordinatorRecord> expectedRecords = Collections.singletonList(
+
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentEpochRecord(groupId,
groupEpoch)
+ );
+
+ assertEquals(expectedRecords, result.records());
+ assertEquals(Collections.emptyMap(), result.targetAssignment());
+ }
+
+ @ParameterizedTest
+ @EnumSource(TaskRole.class)
+ public void testCreateAssignmentMemberSpec(TaskRole taskRole) {
+ String fooSubtopologyId = Uuid.randomUuid().toString();
+ String barSubtopologyId = Uuid.randomUuid().toString();
+
+ final Map<String, String> clientTags = mkMap(mkEntry("tag1",
"value1"), mkEntry("tag2", "value2"));
+ StreamsGroupMember member = new StreamsGroupMember.Builder("member-id")
+ .setRackId("rackId")
+ .setInstanceId("instanceId")
+ .setProcessId("processId")
+ .setClientTags(clientTags)
+ .build();
+
+ TasksTuple assignment = mkTasksTuple(taskRole,
+ mkTasks(fooSubtopologyId, 1, 2, 3),
+ mkTasks(barSubtopologyId, 1, 2, 3)
+ );
+
+ AssignmentMemberSpec assignmentMemberSpec = createAssignmentMemberSpec(
+ member,
+ assignment
+ );
+
+ assertEquals(new AssignmentMemberSpec(
+ Optional.of("instanceId"),
+ Optional.of("rackId"),
+ assignment.activeTasks(),
+ assignment.standbyTasks(),
+ assignment.warmupTasks(),
+ "processId",
+ clientTags,
+ Map.of(),
+ Map.of()
+ ), assignmentMemberSpec);
+ }
+
+ @Test
+ public void testEmpty() {
+ TargetAssignmentBuilderTestContext context = new
TargetAssignmentBuilderTestContext(
+ "my-group",
+ 20
+ );
+
+
org.apache.kafka.coordinator.group.streams.TargetAssignmentBuilder.TargetAssignmentResult
result = context.build();
+ assertEquals(List.of(newStreamsGroupTargetAssignmentEpochRecord(
+ "my-group",
+ 20
+ )), result.records());
+ assertEquals(Map.of(), result.targetAssignment());
+ }
+
+
+ @ParameterizedTest
+ @EnumSource(TaskRole.class)
+ public void testAssignmentHasNotChanged(TaskRole taskRole) {
+ TargetAssignmentBuilderTestContext context = new
TargetAssignmentBuilderTestContext(
+ "my-group",
+ 20
+ );
+
+ String fooSubtopologyId =
context.addSubtopologyWithSingleSourceTopic("foo", 6, Map.of());
+ String barSubtopologyId =
context.addSubtopologyWithSingleSourceTopic("bar", 6, Map.of());
+
+ context.addGroupMember("member-1", mkTasksTuple(taskRole,
+ mkTasks(fooSubtopologyId, 1, 2, 3),
+ mkTasks(barSubtopologyId, 1, 2, 3)
+ ));
+
+ context.addGroupMember("member-2", mkTasksTuple(taskRole,
+ mkTasks(fooSubtopologyId, 4, 5, 6),
+ mkTasks(barSubtopologyId, 4, 5, 6)
+ ));
+
+ context.prepareMemberAssignment("member-1", mkTasksTuple(taskRole,
+ mkTasks(fooSubtopologyId, 1, 2, 3),
+ mkTasks(barSubtopologyId, 1, 2, 3)
+ ));
+
+ context.prepareMemberAssignment("member-2", mkTasksTuple(taskRole,
+ mkTasks(fooSubtopologyId, 4, 5, 6),
+ mkTasks(barSubtopologyId, 4, 5, 6)
+ ));
+
+
org.apache.kafka.coordinator.group.streams.TargetAssignmentBuilder.TargetAssignmentResult
result = context.build();
+
+ assertEquals(List.of(newStreamsGroupTargetAssignmentEpochRecord(
+ "my-group",
+ 20
+ )), result.records());
+
+ Map<String, TasksTuple> expectedAssignment = new HashMap<>();
+ expectedAssignment.put("member-1", mkTasksTuple(taskRole,
+ mkTasks(fooSubtopologyId, 1, 2, 3),
+ mkTasks(barSubtopologyId, 1, 2, 3)
+ ));
+ expectedAssignment.put("member-2", mkTasksTuple(taskRole,
+ mkTasks(fooSubtopologyId, 4, 5, 6),
+ mkTasks(barSubtopologyId, 4, 5, 6)
+ ));
+
+ assertEquals(expectedAssignment, result.targetAssignment());
+ }
+
+
+ @ParameterizedTest
+ @EnumSource(TaskRole.class)
+ public void testAssignmentSwapped(TaskRole taskRole) {
+ TargetAssignmentBuilderTestContext context = new
TargetAssignmentBuilderTestContext(
+ "my-group",
+ 20
+ );
+
+ String fooSubtopologyId =
context.addSubtopologyWithSingleSourceTopic("foo", 6, Map.of());
+ String barSubtopologyId =
context.addSubtopologyWithSingleSourceTopic("bar", 6, Map.of());
+
+ context.addGroupMember("member-1", mkTasksTuple(taskRole,
+ mkTasks(fooSubtopologyId, 1, 2, 3),
+ mkTasks(barSubtopologyId, 1, 2, 3)
+ ));
+
+ context.addGroupMember("member-2", mkTasksTuple(taskRole,
+ mkTasks(fooSubtopologyId, 4, 5, 6),
+ mkTasks(barSubtopologyId, 4, 5, 6)
+ ));
+
+ context.prepareMemberAssignment("member-2", mkTasksTuple(taskRole,
+ mkTasks(fooSubtopologyId, 1, 2, 3),
+ mkTasks(barSubtopologyId, 1, 2, 3)
+ ));
+
+ context.prepareMemberAssignment("member-1", mkTasksTuple(taskRole,
+ mkTasks(fooSubtopologyId, 4, 5, 6),
+ mkTasks(barSubtopologyId, 4, 5, 6)
+ ));
+
+
org.apache.kafka.coordinator.group.streams.TargetAssignmentBuilder.TargetAssignmentResult
result = context.build();
+
+ assertEquals(3, result.records().size());
+
+ assertUnorderedRecordsEquals(List.of(List.of(
+ newStreamsGroupTargetAssignmentRecord("my-group", "member-1",
mkTasksTuple(taskRole,
+ mkTasks(fooSubtopologyId, 4, 5, 6),
+ mkTasks(barSubtopologyId, 4, 5, 6)
+ )),
+ newStreamsGroupTargetAssignmentRecord("my-group", "member-2",
mkTasksTuple(taskRole,
+ mkTasks(fooSubtopologyId, 1, 2, 3),
+ mkTasks(barSubtopologyId, 1, 2, 3)
+ ))
+ )), result.records().subList(0, 2));
+
+ assertEquals(newStreamsGroupTargetAssignmentEpochRecord(
+ "my-group",
+ 20
+ ), result.records().get(2));
+
+ Map<String, TasksTuple> expectedAssignment = new HashMap<>();
+ expectedAssignment.put("member-2", mkTasksTuple(taskRole,
+ mkTasks(fooSubtopologyId, 1, 2, 3),
+ mkTasks(barSubtopologyId, 1, 2, 3)
+ ));
+ expectedAssignment.put("member-1", mkTasksTuple(taskRole,
+ mkTasks(fooSubtopologyId, 4, 5, 6),
+ mkTasks(barSubtopologyId, 4, 5, 6)
+ ));
+
+ assertEquals(expectedAssignment, result.targetAssignment());
+ }
+
+
+ @ParameterizedTest
+ @EnumSource(TaskRole.class)
+ public void testNewMember(TaskRole taskRole) {
+ TargetAssignmentBuilderTestContext context = new
TargetAssignmentBuilderTestContext(
+ "my-group",
+ 20
+ );
+
+ String fooSubtopologyId =
context.addSubtopologyWithSingleSourceTopic("foo", 6, Map.of());
+ String barSubtopologyId =
context.addSubtopologyWithSingleSourceTopic("bar", 6, Map.of());
+
+ context.addGroupMember("member-1", mkTasksTuple(taskRole,
+ mkTasks(fooSubtopologyId, 1, 2, 3),
+ mkTasks(barSubtopologyId, 1, 2, 3)
+ ));
+
+ context.addGroupMember("member-2", mkTasksTuple(taskRole,
+ mkTasks(fooSubtopologyId, 4, 5, 6),
+ mkTasks(barSubtopologyId, 4, 5, 6)
+ ));
+
+ context.updateMemberMetadata("member-3");
+
+ context.prepareMemberAssignment("member-1", mkTasksTuple(taskRole,
+ mkTasks(fooSubtopologyId, 1, 2),
+ mkTasks(barSubtopologyId, 1, 2)
+ ));
+
+ context.prepareMemberAssignment("member-2", mkTasksTuple(taskRole,
+ mkTasks(fooSubtopologyId, 3, 4),
+ mkTasks(barSubtopologyId, 3, 4)
+ ));
+
+ context.prepareMemberAssignment("member-3", mkTasksTuple(taskRole,
+ mkTasks(fooSubtopologyId, 5, 6),
+ mkTasks(barSubtopologyId, 5, 6)
+ ));
+
+
org.apache.kafka.coordinator.group.streams.TargetAssignmentBuilder.TargetAssignmentResult
result = context.build();
+
+ assertEquals(4, result.records().size());
+
+ assertUnorderedRecordsEquals(List.of(List.of(
+ newStreamsGroupTargetAssignmentRecord("my-group", "member-1",
mkTasksTuple(taskRole,
+ mkTasks(fooSubtopologyId, 1, 2),
+ mkTasks(barSubtopologyId, 1, 2)
+ )),
+ newStreamsGroupTargetAssignmentRecord("my-group", "member-2",
mkTasksTuple(taskRole,
+ mkTasks(fooSubtopologyId, 3, 4),
+ mkTasks(barSubtopologyId, 3, 4)
+ )),
+ newStreamsGroupTargetAssignmentRecord("my-group", "member-3",
mkTasksTuple(taskRole,
+ mkTasks(fooSubtopologyId, 5, 6),
+ mkTasks(barSubtopologyId, 5, 6)
+ ))
+ )), result.records().subList(0, 3));
+
+ assertEquals(newStreamsGroupTargetAssignmentEpochRecord(
+ "my-group",
+ 20
+ ), result.records().get(3));
+
+ Map<String, TasksTuple> expectedAssignment = new HashMap<>();
+ expectedAssignment.put("member-1", mkTasksTuple(taskRole,
+ mkTasks(fooSubtopologyId, 1, 2),
+ mkTasks(barSubtopologyId, 1, 2)
+ ));
+ expectedAssignment.put("member-2", mkTasksTuple(taskRole,
+ mkTasks(fooSubtopologyId, 3, 4),
+ mkTasks(barSubtopologyId, 3, 4)
+ ));
+ expectedAssignment.put("member-3", mkTasksTuple(taskRole,
+ mkTasks(fooSubtopologyId, 5, 6),
+ mkTasks(barSubtopologyId, 5, 6)
+ ));
+
+ assertEquals(expectedAssignment, result.targetAssignment());
+ }
+
+
+ @ParameterizedTest
+ @EnumSource(TaskRole.class)
+ public void testUpdateMember(TaskRole taskRole) {
+ TargetAssignmentBuilderTestContext context = new
TargetAssignmentBuilderTestContext(
+ "my-group",
+ 20
+ );
+
+ String fooSubtopologyId =
context.addSubtopologyWithSingleSourceTopic("foo", 6, Map.of());
+ String barSubtopologyId =
context.addSubtopologyWithSingleSourceTopic("bar", 6, Map.of());
+
+ context.addGroupMember("member-1", mkTasksTuple(taskRole,
+ mkTasks(fooSubtopologyId, 1, 2, 3),
+ mkTasks(barSubtopologyId, 1, 2)
+ ));
+
+ context.addGroupMember("member-2", mkTasksTuple(taskRole,
+ mkTasks(fooSubtopologyId, 4, 5, 6),
+ mkTasks(barSubtopologyId, 3, 4)
+ ));
+
+ context.addGroupMember("member-3", mkTasksTuple(taskRole,
+ mkTasks(barSubtopologyId, 5, 6)
+ ));
+
+ context.updateMemberMetadata(
+ "member-3",
+ Optional.of("instance-id-3"),
+ Optional.of("rack-0")
+ );
+
+ context.prepareMemberAssignment("member-1", mkTasksTuple(taskRole,
+ mkTasks(fooSubtopologyId, 1, 2),
+ mkTasks(barSubtopologyId, 1, 2)
+ ));
+
+ context.prepareMemberAssignment("member-2", mkTasksTuple(taskRole,
+ mkTasks(fooSubtopologyId, 3, 4),
+ mkTasks(barSubtopologyId, 3, 4)
+ ));
+
+ context.prepareMemberAssignment("member-3", mkTasksTuple(taskRole,
+ mkTasks(fooSubtopologyId, 5, 6),
+ mkTasks(barSubtopologyId, 5, 6)
+ ));
+
+
org.apache.kafka.coordinator.group.streams.TargetAssignmentBuilder.TargetAssignmentResult
result = context.build();
+
+ assertEquals(4, result.records().size());
+
+ assertUnorderedRecordsEquals(List.of(List.of(
+ newStreamsGroupTargetAssignmentRecord("my-group", "member-1",
mkTasksTuple(taskRole,
+ mkTasks(fooSubtopologyId, 1, 2),
+ mkTasks(barSubtopologyId, 1, 2)
+ )),
+ newStreamsGroupTargetAssignmentRecord("my-group", "member-2",
mkTasksTuple(taskRole,
+ mkTasks(fooSubtopologyId, 3, 4),
+ mkTasks(barSubtopologyId, 3, 4)
+ )),
+ newStreamsGroupTargetAssignmentRecord("my-group", "member-3",
mkTasksTuple(taskRole,
+ mkTasks(fooSubtopologyId, 5, 6),
+ mkTasks(barSubtopologyId, 5, 6)
+ ))
+ )), result.records().subList(0, 3));
+
+ assertEquals(newStreamsGroupTargetAssignmentEpochRecord(
+ "my-group",
+ 20
+ ), result.records().get(3));
+
+ Map<String, TasksTuple> expectedAssignment = new HashMap<>();
+ expectedAssignment.put("member-1", mkTasksTuple(taskRole,
+ mkTasks(fooSubtopologyId, 1, 2),
+ mkTasks(barSubtopologyId, 1, 2)
+ ));
+ expectedAssignment.put("member-2", mkTasksTuple(taskRole,
+ mkTasks(fooSubtopologyId, 3, 4),
+ mkTasks(barSubtopologyId, 3, 4)
+ ));
+ expectedAssignment.put("member-3", mkTasksTuple(taskRole,
+ mkTasks(fooSubtopologyId, 5, 6),
+ mkTasks(barSubtopologyId, 5, 6)
+ ));
+
+ assertEquals(expectedAssignment, result.targetAssignment());
+ }
+
+
+ @ParameterizedTest
+ @EnumSource(TaskRole.class)
+ public void testPartialAssignmentUpdate(TaskRole taskRole) {
+ TargetAssignmentBuilderTestContext context = new
TargetAssignmentBuilderTestContext(
+ "my-group",
+ 20
+ );
+
+ String fooSubtopologyId =
context.addSubtopologyWithSingleSourceTopic("foo", 6, mkMapOfPartitionRacks(6));
+ String barSubtopologyId =
context.addSubtopologyWithSingleSourceTopic("bar", 6, mkMapOfPartitionRacks(6));
+
+ context.addGroupMember("member-1", mkTasksTuple(taskRole,
+ mkTasks(fooSubtopologyId, 1, 2),
+ mkTasks(barSubtopologyId, 1, 2)
+ ));
+
+ context.addGroupMember("member-2", mkTasksTuple(taskRole,
+ mkTasks(fooSubtopologyId, 3, 4),
+ mkTasks(barSubtopologyId, 3, 4)
+ ));
+
+ context.addGroupMember("member-3", mkTasksTuple(taskRole,
+ mkTasks(fooSubtopologyId, 5, 6),
+ mkTasks(barSubtopologyId, 5, 6)
+ ));
+
+ context.prepareMemberAssignment("member-1", mkTasksTuple(taskRole,
+ mkTasks(fooSubtopologyId, 1, 2),
+ mkTasks(barSubtopologyId, 1, 2)
+ ));
+
+ context.prepareMemberAssignment("member-2", mkTasksTuple(taskRole,
+ mkTasks(fooSubtopologyId, 3, 4, 5),
+ mkTasks(barSubtopologyId, 3, 4, 5)
+ ));
+
+ context.prepareMemberAssignment("member-3", mkTasksTuple(taskRole,
+ mkTasks(fooSubtopologyId, 6),
+ mkTasks(barSubtopologyId, 6)
+ ));
+
+
org.apache.kafka.coordinator.group.streams.TargetAssignmentBuilder.TargetAssignmentResult
result = context.build();
+
+ assertEquals(3, result.records().size());
+
+ // Member 1 has no record because its assignment did not change.
+ assertUnorderedRecordsEquals(List.of(List.of(
+ newStreamsGroupTargetAssignmentRecord("my-group", "member-2",
mkTasksTuple(taskRole,
+ mkTasks(fooSubtopologyId, 3, 4, 5),
+ mkTasks(barSubtopologyId, 3, 4, 5)
+ )),
+ newStreamsGroupTargetAssignmentRecord("my-group", "member-3",
mkTasksTuple(taskRole,
+ mkTasks(fooSubtopologyId, 6),
+ mkTasks(barSubtopologyId, 6)
+ ))
+ )), result.records().subList(0, 2));
+
+ assertEquals(newStreamsGroupTargetAssignmentEpochRecord(
+ "my-group",
+ 20
+ ), result.records().get(2));
+
+ Map<String, TasksTuple> expectedAssignment = new HashMap<>();
+ expectedAssignment.put("member-1", mkTasksTuple(taskRole,
+ mkTasks(fooSubtopologyId, 1, 2),
+ mkTasks(barSubtopologyId, 1, 2)
+ ));
+ expectedAssignment.put("member-2", mkTasksTuple(taskRole,
+ mkTasks(fooSubtopologyId, 3, 4, 5),
+ mkTasks(barSubtopologyId, 3, 4, 5)
+ ));
+ expectedAssignment.put("member-3", mkTasksTuple(taskRole,
+ mkTasks(fooSubtopologyId, 6),
+ mkTasks(barSubtopologyId, 6)
+ ));
+
+ assertEquals(expectedAssignment, result.targetAssignment());
+ }
+
+
+ @ParameterizedTest
+ @EnumSource(TaskRole.class)
+ public void testDeleteMember(TaskRole taskRole) {
+ TargetAssignmentBuilderTestContext context = new
TargetAssignmentBuilderTestContext(
+ "my-group",
+ 20
+ );
+
+ String fooSubtopologyId =
context.addSubtopologyWithSingleSourceTopic("foo", 6, Map.of());
+ String barSubtopologyId =
context.addSubtopologyWithSingleSourceTopic("bar", 6, Map.of());
+
+ context.addGroupMember("member-1", mkTasksTuple(taskRole,
+ mkTasks(fooSubtopologyId, 1, 2),
+ mkTasks(barSubtopologyId, 1, 2)
+ ));
+
+ context.addGroupMember("member-2", mkTasksTuple(taskRole,
+ mkTasks(fooSubtopologyId, 3, 4),
+ mkTasks(barSubtopologyId, 3, 4)
+ ));
+
+ context.addGroupMember("member-3", mkTasksTuple(taskRole,
+ mkTasks(fooSubtopologyId, 5, 6),
+ mkTasks(barSubtopologyId, 5, 6)
+ ));
+
+ context.removeMember("member-3");
+
+ context.prepareMemberAssignment("member-1", mkTasksTuple(taskRole,
+ mkTasks(fooSubtopologyId, 1, 2, 3),
+ mkTasks(barSubtopologyId, 1, 2, 3)
+ ));
+
+ context.prepareMemberAssignment("member-2", mkTasksTuple(taskRole,
+ mkTasks(fooSubtopologyId, 4, 5, 6),
+ mkTasks(barSubtopologyId, 4, 5, 6)
+ ));
+
+
org.apache.kafka.coordinator.group.streams.TargetAssignmentBuilder.TargetAssignmentResult
result = context.build();
+
+ assertEquals(3, result.records().size());
+
+ assertUnorderedRecordsEquals(List.of(List.of(
+ newStreamsGroupTargetAssignmentRecord("my-group", "member-1",
mkTasksTuple(taskRole,
+ mkTasks(fooSubtopologyId, 1, 2, 3),
+ mkTasks(barSubtopologyId, 1, 2, 3)
+ )),
+ newStreamsGroupTargetAssignmentRecord("my-group", "member-2",
mkTasksTuple(taskRole,
+ mkTasks(fooSubtopologyId, 4, 5, 6),
+ mkTasks(barSubtopologyId, 4, 5, 6)
+ ))
+ )), result.records().subList(0, 2));
+
+ assertEquals(newStreamsGroupTargetAssignmentEpochRecord(
+ "my-group",
+ 20
+ ), result.records().get(2));
+
+ Map<String, TasksTuple> expectedAssignment = new HashMap<>();
+ expectedAssignment.put("member-1", mkTasksTuple(taskRole,
+ mkTasks(fooSubtopologyId, 1, 2, 3),
+ mkTasks(barSubtopologyId, 1, 2, 3)
+ ));
+ expectedAssignment.put("member-2", mkTasksTuple(taskRole,
+ mkTasks(fooSubtopologyId, 4, 5, 6),
+ mkTasks(barSubtopologyId, 4, 5, 6)
+ ));
+
+ assertEquals(expectedAssignment, result.targetAssignment());
+ }
+
+
+ @ParameterizedTest
+ @EnumSource(TaskRole.class)
+ public void testReplaceStaticMember(TaskRole taskRole) {
+ TargetAssignmentBuilderTestContext context = new
TargetAssignmentBuilderTestContext(
+ "my-group",
+ 20
+ );
+
+ String fooSubtopologyId =
context.addSubtopologyWithSingleSourceTopic("foo", 6, Map.of());
+ String barSubtopologyId =
context.addSubtopologyWithSingleSourceTopic("bar", 6, Map.of());
+
+ context.addGroupMember("member-1", "instance-member-1",
mkTasksTuple(taskRole,
+ mkTasks(fooSubtopologyId, 1, 2),
+ mkTasks(barSubtopologyId, 1, 2)
+ ));
+
+ context.addGroupMember("member-2", "instance-member-2",
mkTasksTuple(taskRole,
+ mkTasks(fooSubtopologyId, 3, 4),
+ mkTasks(barSubtopologyId, 3, 4)
+ ));
+
+ context.addGroupMember("member-3", "instance-member-3",
mkTasksTuple(taskRole,
+ mkTasks(fooSubtopologyId, 5, 6),
+ mkTasks(barSubtopologyId, 5, 6)
+ ));
+
+ // Static member 3 leaves
+ context.removeMember("member-3");
+
+ // Another static member joins with the same instance id as the
departed one
+ context.updateMemberMetadata("member-3-a",
Optional.of("instance-member-3"),
+ Optional.empty());
+
+ context.prepareMemberAssignment("member-1", mkTasksTuple(taskRole,
+ mkTasks(fooSubtopologyId, 1, 2),
+ mkTasks(barSubtopologyId, 1, 2)
+ ));
+
+ context.prepareMemberAssignment("member-2", mkTasksTuple(taskRole,
+ mkTasks(fooSubtopologyId, 3, 4),
+ mkTasks(barSubtopologyId, 3, 4)
+ ));
+
+ context.prepareMemberAssignment("member-3-a", mkTasksTuple(taskRole,
+ mkTasks(fooSubtopologyId, 5, 6),
+ mkTasks(barSubtopologyId, 5, 6)
+ ));
+
+ TargetAssignmentBuilder.TargetAssignmentResult result =
context.build();
+
+ assertEquals(2, result.records().size());
+
+ assertUnorderedRecordsEquals(List.of(List.of(
+ newStreamsGroupTargetAssignmentRecord("my-group", "member-3-a",
mkTasksTuple(taskRole,
+ mkTasks(fooSubtopologyId, 5, 6),
+ mkTasks(barSubtopologyId, 5, 6)
+ ))
+ )), result.records().subList(0, 1));
+
+ assertEquals(newStreamsGroupTargetAssignmentEpochRecord(
+ "my-group",
+ 20
+ ), result.records().get(1));
+
+ Map<String, TasksTuple> expectedAssignment = new HashMap<>();
+ expectedAssignment.put("member-1", mkTasksTuple(taskRole,
+ mkTasks(fooSubtopologyId, 1, 2),
+ mkTasks(barSubtopologyId, 1, 2)
+ ));
+ expectedAssignment.put("member-2", mkTasksTuple(taskRole,
+ mkTasks(fooSubtopologyId, 3, 4),
+ mkTasks(barSubtopologyId, 3, 4)
+ ));
+
+ expectedAssignment.put("member-3-a", mkTasksTuple(taskRole,
+ mkTasks(fooSubtopologyId, 5, 6),
+ mkTasks(barSubtopologyId, 5, 6)
+ ));
+
+ assertEquals(expectedAssignment, result.targetAssignment());
+ }
+
+ public static class TargetAssignmentBuilderTestContext {
+
+ private final String groupId;
+ private final int groupEpoch;
+ private final TaskAssignor assignor = mock(TaskAssignor.class);
+ private final SortedMap<String, ConfiguredSubtopology> subtopologies =
new TreeMap<>();
+ private final ConfiguredTopology topology = new ConfiguredTopology(0,
Optional.of(subtopologies), new HashMap<>(),
+ Optional.empty());
+ private final Map<String, StreamsGroupMember> members = new
HashMap<>();
+ private final Map<String,
org.apache.kafka.coordinator.group.streams.TopicMetadata> subscriptionMetadata
= new HashMap<>();
+ private final Map<String, StreamsGroupMember> updatedMembers = new
HashMap<>();
+ private final Map<String, TasksTuple> targetAssignment = new
HashMap<>();
+ private final Map<String, MemberAssignment> memberAssignments = new
HashMap<>();
+ private final Map<String, String> staticMembers = new HashMap<>();
+ private MetadataImageBuilder topicsImageBuilder = new
MetadataImageBuilder();
+
+ public TargetAssignmentBuilderTestContext(
+ String groupId,
+ int groupEpoch
+ ) {
+ this.groupId = groupId;
+ this.groupEpoch = groupEpoch;
+ }
+
+ public void addGroupMember(
+ String memberId,
+ TasksTuple targetTasks
+ ) {
+ addGroupMember(memberId, null, targetTasks);
+ }
+
+ private void addGroupMember(
+ String memberId,
+ String instanceId,
+ TasksTuple targetTasks
+ ) {
+ StreamsGroupMember.Builder memberBuilder = new
StreamsGroupMember.Builder(memberId);
+ memberBuilder.setProcessId("processId");
+ memberBuilder.setClientTags(Map.of());
+ memberBuilder.setUserEndpoint(new
StreamsGroupMemberMetadataValue.Endpoint().setHost("host").setPort(9090));
+
+ if (instanceId != null) {
+ memberBuilder.setInstanceId(instanceId);
+ staticMembers.put(instanceId, memberId);
+ } else {
+ memberBuilder.setInstanceId(null);
+ }
+ memberBuilder.setRackId(null);
+ members.put(memberId, memberBuilder.build());
+ targetAssignment.put(memberId, targetTasks);
+ }
+
+ public String addSubtopologyWithSingleSourceTopic(
+ String topicName,
+ int numTasks,
+ Map<Integer, Set<String>> partitionRacks
+ ) {
+ String subtopologyId = Uuid.randomUuid().toString();
+ Uuid topicId = Uuid.randomUuid();
+ subscriptionMetadata.put(topicName, new
org.apache.kafka.coordinator.group.streams.TopicMetadata(
+ topicId,
+ topicName,
+ numTasks,
+ partitionRacks
+ ));
+ topicsImageBuilder = topicsImageBuilder.addTopic(topicId,
topicName, numTasks);
+ subtopologies.put(subtopologyId, new
ConfiguredSubtopology(Set.of(topicId.toString()), Map.of(), Set.of(),
Map.of()));
+
+ return subtopologyId;
+ }
+
+ public void updateMemberMetadata(
+ String memberId
+ ) {
+ updateMemberMetadata(
+ memberId,
+ Optional.empty(),
+ Optional.empty()
+ );
+ }
+
+ public void updateMemberMetadata(
+ String memberId,
+ Optional<String> instanceId,
+ Optional<String> rackId
+ ) {
+ StreamsGroupMember existingMember = members.get(memberId);
+ StreamsGroupMember.Builder builder;
+ if (existingMember != null) {
+ builder = new StreamsGroupMember.Builder(existingMember);
+ } else {
+ builder = new StreamsGroupMember.Builder(memberId);
+ builder.setProcessId("processId");
+ builder.setRackId(null);
+ builder.setInstanceId(null);
+ builder.setClientTags(Map.of());
+ builder.setUserEndpoint(new
StreamsGroupMemberMetadataValue.Endpoint().setHost("host").setPort(9090));
+ }
+ updatedMembers.put(memberId, builder
+ .maybeUpdateInstanceId(instanceId)
+ .maybeUpdateRackId(rackId)
+ .build());
+ }
+
+ public void removeMember(
+ String memberId
+ ) {
+ this.updatedMembers.put(memberId, null);
+ }
+
+ public void prepareMemberAssignment(
+ String memberId,
+ TasksTuple assignment
+ ) {
+ memberAssignments.put(memberId, new
MemberAssignment(assignment.activeTasks(), assignment.standbyTasks(),
assignment.warmupTasks()));
+ }
+
+ public
org.apache.kafka.coordinator.group.streams.TargetAssignmentBuilder.TargetAssignmentResult
build() {
+ // Prepare expected member specs.
+ Map<String, AssignmentMemberSpec> memberSpecs = new HashMap<>();
+
+ // All the existing members are prepared.
+ members.forEach((memberId, member) ->
+ memberSpecs.put(memberId, createAssignmentMemberSpec(
+ member,
+ targetAssignment.getOrDefault(memberId,
TasksTuple.EMPTY)
+ )
+ ));
+
+ // All the updated are added and all the deleted
+ // members are removed.
+ updatedMembers.forEach((memberId, updatedMemberOrNull) -> {
+ if (updatedMemberOrNull == null) {
+ memberSpecs.remove(memberId);
+ } else {
+ TasksTuple assignment =
targetAssignment.getOrDefault(memberId,
+ TasksTuple.EMPTY);
+
+ // A new static member joins and needs to replace an
existing departed one.
+ if (updatedMemberOrNull.instanceId().isPresent()) {
+ String previousMemberId =
staticMembers.get(updatedMemberOrNull.instanceId().get());
+ if (previousMemberId != null &&
!previousMemberId.equals(memberId)) {
+ assignment =
targetAssignment.getOrDefault(previousMemberId,
+ TasksTuple.EMPTY);
+ }
+ }
+
+ memberSpecs.put(memberId, createAssignmentMemberSpec(
+ updatedMemberOrNull,
+ assignment
+ ));
+ }
+ });
+
+ // Prepare the expected topology metadata.
+ TopologyMetadata topologyMetadata = new
TopologyMetadata(subscriptionMetadata, subtopologies);
+
+ // Prepare the expected assignment spec.
+ GroupSpecImpl groupSpec = new GroupSpecImpl(memberSpecs, new
HashMap<>());
+
+ // We use `any` here to always return an assignment but use
`verify` later on
+ // to ensure that the input was correct.
+ when(assignor.assign(any(), any()))
+ .thenReturn(new GroupAssignment(memberAssignments));
+
+ // Create and populate the assignment builder.
+ org.apache.kafka.coordinator.group.streams.TargetAssignmentBuilder
builder = new
org.apache.kafka.coordinator.group.streams.TargetAssignmentBuilder(
+ groupId, groupEpoch, assignor, Map.of())
+ .withMembers(members)
+ .withTopology(topology)
+ .withStaticMembers(staticMembers)
+ .withPartitionMetadata(subscriptionMetadata)
+ .withTargetAssignment(targetAssignment);
+
+ // Add the updated members or delete the deleted members.
+ updatedMembers.forEach((memberId, updatedMemberOrNull) -> {
+ if (updatedMemberOrNull != null) {
+ builder.addOrUpdateMember(memberId, updatedMemberOrNull);
+ } else {
+ builder.removeMember(memberId);
+ }
+ });
+
+ // Execute the builder.
+
org.apache.kafka.coordinator.group.streams.TargetAssignmentBuilder.TargetAssignmentResult
result = builder.build();
+
+ // Verify that the assignor was called once with the expected
+ // assignment spec.
+ verify(assignor, times(1))
+ .assign(groupSpec, topologyMetadata);
+
+ return result;
+ }
+ }
+}
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/TopologyMetadataTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/TopologyMetadataTest.java
new file mode 100644
index 00000000000..a5c18a6f0f2
--- /dev/null
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/TopologyMetadataTest.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.streams;
+
+import
org.apache.kafka.coordinator.group.streams.topics.ConfiguredInternalTopic;
+import org.apache.kafka.coordinator.group.streams.topics.ConfiguredSubtopology;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.TreeMap;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+
+class TopologyMetadataTest {
+
+ private Map<String, TopicMetadata> topicMetadata;
+ private SortedMap<String, ConfiguredSubtopology> subtopologyMap;
+ private TopologyMetadata topologyMetadata;
+
+ @BeforeEach
+ void setUp() {
+ topicMetadata = new HashMap<>();
+ subtopologyMap = new TreeMap<>();
+ topologyMetadata = new TopologyMetadata(topicMetadata, subtopologyMap);
+ }
+
+ @Test
+ void testTopicMetadata() {
+ assertEquals(topicMetadata, topologyMetadata.topicMetadata());
+ }
+
+ @Test
+ void testTopology() {
+ assertEquals(subtopologyMap, topologyMetadata.subtopologyMap());
+ }
+
+ @Test
+ void testIsStateful() {
+ ConfiguredInternalTopic internalTopic =
mock(ConfiguredInternalTopic.class);
+ ConfiguredSubtopology subtopology1 = mock(ConfiguredSubtopology.class);
+ ConfiguredSubtopology subtopology2 = mock(ConfiguredSubtopology.class);
+ subtopologyMap.put("subtopology1", subtopology1);
+ subtopologyMap.put("subtopology2", subtopology2);
+
when(subtopology1.stateChangelogTopics()).thenReturn(Map.of("state_changelog_topic",
internalTopic));
+ when(subtopology2.stateChangelogTopics()).thenReturn(Map.of());
+
+ assertTrue(topologyMetadata.isStateful("subtopology1"));
+ assertFalse(topologyMetadata.isStateful("subtopology2"));
+ }
+
+ @Test
+ void testMaxNumInputPartitions() {
+ ConfiguredInternalTopic internalTopic =
mock(ConfiguredInternalTopic.class);
+ ConfiguredSubtopology subtopology = mock(ConfiguredSubtopology.class);
+ subtopologyMap.put("subtopology1", subtopology);
+ when(subtopology.sourceTopics()).thenReturn(Set.of("source_topic"));
+
when(subtopology.repartitionSourceTopics()).thenReturn(Map.of("repartition_source_topic",
internalTopic));
+
+ TopicMetadata topicMeta1 = mock(TopicMetadata.class);
+ TopicMetadata topicMeta2 = mock(TopicMetadata.class);
+ topicMetadata.put("source_topic", topicMeta1);
+ topicMetadata.put("repartition_source_topic", topicMeta2);
+ when(topicMeta1.numPartitions()).thenReturn(3);
+ when(topicMeta2.numPartitions()).thenReturn(4);
+
+ assertEquals(4,
topologyMetadata.maxNumInputPartitions("subtopology1"));
+ }
+
+ @Test
+ void testSubtopologies() {
+ ConfiguredSubtopology subtopology1 = mock(ConfiguredSubtopology.class);
+ ConfiguredSubtopology subtopology2 = mock(ConfiguredSubtopology.class);
+ subtopologyMap.put("subtopology1", subtopology1);
+ subtopologyMap.put("subtopology2", subtopology2);
+
+ List<String> expectedSubtopologies = List.of("subtopology1",
"subtopology2");
+ assertEquals(expectedSubtopologies, topologyMetadata.subtopologies());
+ }
+
+ @Test
+ void testIsStatefulThrowsExceptionWhenSubtopologyIdDoesNotExist() {
+ assertThrows(NoSuchElementException.class, () ->
topologyMetadata.isStateful("non_existent_subtopology"));
+ }
+
+ @Test
+ void
testMaxNumInputPartitionsThrowsExceptionWhenSubtopologyIdDoesNotExist() {
+ assertThrows(NoSuchElementException.class, () ->
topologyMetadata.maxNumInputPartitions("non_existent_subtopology"));
+ }
+
+ @Test
+ void
testMaxNumInputPartitionsThrowsExceptionWhenSubtopologyContainsNoSourceTopics()
{
+ ConfiguredSubtopology subtopology = mock(ConfiguredSubtopology.class);
+ when(subtopology.sourceTopics()).thenReturn(Set.of());
+ when(subtopology.repartitionSourceTopics()).thenReturn(Map.of());
+ subtopologyMap.put("subtopology1", subtopology);
+
+ assertThrows(IllegalStateException.class, () ->
topologyMetadata.maxNumInputPartitions("subtopology1"));
+ }
+}
\ No newline at end of file
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/assignor/MockAssignorTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/assignor/MockAssignorTest.java
index 25dada072df..d44b24549e0 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/assignor/MockAssignorTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/assignor/MockAssignorTest.java
@@ -253,7 +253,7 @@ public class MockAssignorTest {
}
@Override
- public int numTasks(String subtopologyId) {
+ public int maxNumInputPartitions(String subtopologyId) {
return numPartitions;
}
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/assignor/StickyTaskAssignorTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/assignor/StickyTaskAssignorTest.java
index 112dd2281d8..542aad73dee 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/assignor/StickyTaskAssignorTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/assignor/StickyTaskAssignorTest.java
@@ -1094,7 +1094,7 @@ public class StickyTaskAssignorTest {
}
@Override
- public int numTasks(String subtopologyId) throws
NoSuchElementException {
+ public int maxNumInputPartitions(String subtopologyId) throws
NoSuchElementException {
return numTasks;
}
@@ -1112,7 +1112,7 @@ public class StickyTaskAssignorTest {
}
@Override
- public int numTasks(String subtopologyId) throws
NoSuchElementException {
+ public int maxNumInputPartitions(String subtopologyId) throws
NoSuchElementException {
if (subtopologyId.equals("test-subtopology1"))
return 6;
return 1;
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/ConfiguredTopologyTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/ConfiguredTopologyTest.java
index 2d6d096235a..a909629fa20 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/ConfiguredTopologyTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/ConfiguredTopologyTest.java
@@ -26,6 +26,8 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
+import java.util.SortedMap;
+import java.util.TreeMap;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -53,7 +55,7 @@ public class ConfiguredTopologyTest {
assertThrows(NullPointerException.class,
() -> new ConfiguredTopology(
0,
- Optional.of(Map.of()),
+ Optional.of(new TreeMap<>()),
null,
Optional.empty()
)
@@ -77,7 +79,7 @@ public class ConfiguredTopologyTest {
assertThrows(IllegalArgumentException.class,
() -> new ConfiguredTopology(
-1,
- Optional.of(Map.of()),
+ Optional.of(new TreeMap<>()),
Collections.emptyMap(),
Optional.empty()
)
@@ -100,7 +102,7 @@ public class ConfiguredTopologyTest {
@Test
public void testIsReady() {
ConfiguredTopology readyTopology = new ConfiguredTopology(
- 1, Optional.of(Map.of()), new HashMap<>(), Optional.empty());
+ 1, Optional.of(new TreeMap<>()), new HashMap<>(),
Optional.empty());
assertTrue(readyTopology.isReady());
ConfiguredTopology notReadyTopology = new ConfiguredTopology(
@@ -114,7 +116,7 @@ public class ConfiguredTopologyTest {
ConfiguredSubtopology subtopologyMock =
mock(ConfiguredSubtopology.class);
StreamsGroupDescribeResponseData.Subtopology subtopologyResponse = new
StreamsGroupDescribeResponseData.Subtopology();
when(subtopologyMock.asStreamsGroupDescribeSubtopology(Mockito.anyString())).thenReturn(subtopologyResponse);
- Map<String, ConfiguredSubtopology> subtopologies = new HashMap<>();
+ SortedMap<String, ConfiguredSubtopology> subtopologies = new
TreeMap<>();
subtopologies.put("subtopology1", subtopologyMock);
Map<String, CreatableTopic> internalTopicsToBeCreated = new
HashMap<>();
Optional<TopicConfigurationException> topicConfigurationException =
Optional.empty();