This is an automated email from the ASF dual-hosted git repository.
lucasbru pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new aea699bdef0 KAFKA-18324: Add CurrentAssignmentBuilder (#18476)
aea699bdef0 is described below
commit aea699bdef01b169b73f14c6f1d58df456056e16
Author: Lucas Brutschy <[email protected]>
AuthorDate: Thu Jan 23 17:35:03 2025 +0100
KAFKA-18324: Add CurrentAssignmentBuilder (#18476)
Implements the current assignment builder, analogous to the current
assignment builder of consumer groups. The main difference is the underlying
assigned resource, and slightly different logic around process IDs: We make
sure to move a task only to a new client, once the task is not owned anymore by
any client with the same process ID (sharing the same state directory) - in any
role (active, standby or warm-up).
Compared to the feature branch, the main difference is that I refactored
the separate treatment of active, standby and warm-up tasks into a compound
datatype called TaskTuple (which is used in place of the more specific
Assignment class). This also has effects on StreamsGroupMember.
Reviewers: Bruno Cadonna <[email protected]>, Bill Bejeck
<[email protected]>
---
.../group/streams/CurrentAssignmentBuilder.java | 451 +++++++++++
.../group/streams/StreamsGroupMember.java | 163 +---
.../streams/{Assignment.java => TasksTuple.java} | 73 +-
.../group/streams/assignor/GroupAssignment.java | 2 +-
.../streams/CurrentAssignmentBuilderTest.java | 825 +++++++++++++++++++++
.../group/streams/StreamsGroupMemberTest.java | 132 ++--
.../group/streams/TaskAssignmentTestUtil.java | 26 +-
.../{AssignmentTest.java => TasksTupleTest.java} | 95 ++-
8 files changed, 1530 insertions(+), 237 deletions(-)
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/CurrentAssignmentBuilder.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/CurrentAssignmentBuilder.java
new file mode 100644
index 00000000000..3c9ba064a40
--- /dev/null
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/CurrentAssignmentBuilder.java
@@ -0,0 +1,451 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.streams;
+
+import org.apache.kafka.common.errors.FencedMemberEpochException;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.function.BiFunction;
+import java.util.function.BiPredicate;
+
+/**
+ * The CurrentAssignmentBuilder class encapsulates the reconciliation engine
of the streams group protocol. Given the current state of a
+ * member and a desired or target assignment state, the state machine takes
the necessary steps to converge them.
+ */
+public class CurrentAssignmentBuilder {
+
+ /**
+ * The streams group member which is reconciled.
+ */
+ private final StreamsGroupMember member;
+
+ /**
+ * The target assignment epoch.
+ */
+ private int targetAssignmentEpoch;
+
+ /**
+ * The target assignment.
+ */
+ private TasksTuple targetAssignment;
+
+ /**
+ * A function which returns the current process ID of an active task or
null if the active task
+ * is not assigned. The current process ID is the process ID of the
current owner.
+ */
+ private BiFunction<String, Integer, String> currentActiveTaskProcessId;
+
+ /**
+ * A function which returns the current process IDs of a standby task or
null if the standby
+ * task is not assigned. The current process IDs are the process IDs of
all current owners.
+ */
+ private BiFunction<String, Integer, Set<String>>
currentStandbyTaskProcessIds;
+
+ /**
+ * A function which returns the current process IDs of a warmup task or
null if the warmup task
+ * is not assigned. The current process IDs are the process IDs of all
current owners.
+ */
+ private BiFunction<String, Integer, Set<String>>
currentWarmupTaskProcessIds;
+
+ /**
+ * The tasks owned by the member. This may be provided by the member in
the StreamsGroupHeartbeat request.
+ */
+ private Optional<TasksTuple> ownedTasks = Optional.empty();
+
+ /**
+ * Constructs the CurrentAssignmentBuilder based on the current state of
the provided streams group member.
+ *
+ * @param member The streams group member that must be reconciled.
+ */
+ public CurrentAssignmentBuilder(StreamsGroupMember member) {
+ this.member = Objects.requireNonNull(member);
+ }
+
+ /**
+ * Sets the target assignment epoch and the target assignment that the
streams group member must be reconciled to.
+ *
+ * @param targetAssignmentEpoch The target assignment epoch.
+ * @param targetAssignment The target assignment.
+ * @return This object.
+ */
+ public CurrentAssignmentBuilder withTargetAssignment(int
targetAssignmentEpoch,
+ TasksTuple
targetAssignment) {
+ this.targetAssignmentEpoch = targetAssignmentEpoch;
+ this.targetAssignment = Objects.requireNonNull(targetAssignment);
+ return this;
+ }
+
+ /**
+ * Sets a BiFunction which allows to retrieve the current process ID of an
active task. This is
+ * used by the state machine to determine if an active task is free or
still used by another
+ * member, and if there is still a task on a specific process that is not
yet revoked.
+ *
+ * @param currentActiveTaskProcessId A BiFunction which gets the process
ID of a subtopology ID /
+ * partition ID pair.
+ * @return This object.
+ */
+ public CurrentAssignmentBuilder
withCurrentActiveTaskProcessId(BiFunction<String, Integer, String>
currentActiveTaskProcessId) {
+ this.currentActiveTaskProcessId =
Objects.requireNonNull(currentActiveTaskProcessId);
+ return this;
+ }
+
+ /**
+ * Sets a BiFunction which allows to retrieve the current process IDs of a
standby task. This is
+ * used by the state machine to determine if there is still a task on a
specific process that is
+ * not yet revoked.
+ *
+ * @param currentStandbyTaskProcessIds A BiFunction which gets the process
IDs of a subtopology
+ * ID / partition ID pair.
+ * @return This object.
+ */
+ public CurrentAssignmentBuilder withCurrentStandbyTaskProcessIds(
+ BiFunction<String, Integer, Set<String>> currentStandbyTaskProcessIds
+ ) {
+ this.currentStandbyTaskProcessIds =
Objects.requireNonNull(currentStandbyTaskProcessIds);
+ return this;
+ }
+
+ /**
+ * Sets a BiFunction which allows to retrieve the current process IDs of a
warmup task. This is
+ * used by the state machine to determine if there is still a task on a
specific process that is
+ * not yet revoked.
+ *
+ * @param currentWarmupTaskProcessIds A BiFunction which gets the process
IDs of a subtopology ID
+ * / partition ID pair.
+ * @return This object.
+ */
+ public CurrentAssignmentBuilder
withCurrentWarmupTaskProcessIds(BiFunction<String, Integer, Set<String>>
currentWarmupTaskProcessIds) {
+ this.currentWarmupTaskProcessIds =
Objects.requireNonNull(currentWarmupTaskProcessIds);
+ return this;
+ }
+
+ /**
+ * Sets the tasks currently owned by the member. This comes directly from
the last StreamsGroupHeartbeat request. This is used to
+ * determine if the member has revoked the necessary tasks. Passing null
into this function means that the member did not provide
+ * its owned tasks in this heartbeat.
+ *
+ * @param ownedAssignment A collection of active, standby and warm-up tasks
+ * @return This object.
+ */
+ protected CurrentAssignmentBuilder withOwnedAssignment(TasksTuple
ownedAssignment) {
+ this.ownedTasks = Optional.ofNullable(ownedAssignment);
+ return this;
+ }
+
+ /**
+ * Builds the next state for the member or keep the current one if it is
not possible to move forward with the current state.
+ *
+ * @return A new StreamsGroupMember or the current one.
+ */
+ public StreamsGroupMember build() {
+ switch (member.state()) {
+ case STABLE:
+ // When the member is in the STABLE state, we verify if a newer
+ // epoch (or target assignment) is available. If it is, we can
+ // reconcile the member towards it. Otherwise, we return.
+ if (member.memberEpoch() != targetAssignmentEpoch) {
+ return computeNextAssignment(
+ member.memberEpoch(),
+ member.assignedTasks()
+ );
+ } else {
+ return member;
+ }
+
+ case UNREVOKED_TASKS:
+ // When the member is in the UNREVOKED_TASKS state, we wait
+ // until the member has revoked the necessary tasks. They are
+ // considered revoked when they are not anymore reported in the
+ // owned tasks set in the StreamsGroupHeartbeat API.
+
+ // If the member provides its owned tasks, we verify if it
still
+ // owns any of the revoked tasks. If it did not provide it's
+ // owned tasks, or we still own some of the revoked tasks, we
+ // cannot progress.
+ if (
+ ownedTasks.isEmpty() ||
ownedTasks.get().containsAny(member.tasksPendingRevocation())
+ ) {
+ return member;
+ }
+
+ // When the member has revoked all the pending tasks, it can
+ // transition to the next epoch (current + 1) and we can
reconcile
+ // its state towards the latest target assignment.
+ return computeNextAssignment(
+ member.memberEpoch() + 1,
+ member.assignedTasks()
+ );
+
+ case UNRELEASED_TASKS:
+ // When the member is in the UNRELEASED_TASKS, we reconcile the
+ // member towards the latest target assignment. This will
assign any
+ // of the unreleased tasks when they become available.
+ return computeNextAssignment(
+ member.memberEpoch(),
+ member.assignedTasks()
+ );
+
+ case UNKNOWN:
+ // We could only end up in this state if a new state is added
in the
+ // future and the group coordinator is downgraded. In this
case, the
+ // best option is to fence the member to force it to rejoin
the group
+ // without any tasks and to reconcile it again from scratch.
+ if ((ownedTasks.isEmpty() || !ownedTasks.get().isEmpty())) {
+ throw new FencedMemberEpochException(
+ "The streams group member is in a unknown state. "
+ + "The member must abandon all its tasks and
rejoin.");
+ }
+
+ return computeNextAssignment(
+ targetAssignmentEpoch,
+ member.assignedTasks()
+ );
+ }
+
+ return member;
+ }
+
+ /**
+ * Takes the current currentAssignment and the targetAssignment, and
generates three
+ * collections:
+ *
+ * - the resultAssignedTasks: the tasks that are assigned in both the
current and target
+ * assignments.
+ * - the resultTasksPendingRevocation: the tasks that are assigned in the
current
+ * assignment but not in the target assignment.
+ * - the resultTasksPendingAssignment: the tasks that are assigned in the
target assignment but
+ * not in the current assignment, and can be assigned currently (i.e.,
they are not owned by
+ * another member, as defined by the `isUnreleasedTask` predicate).
+ */
+ private boolean computeAssignmentDifference(Map<String, Set<Integer>>
currentAssignment,
+ Map<String, Set<Integer>>
targetAssignment,
+ Map<String, Set<Integer>>
resultAssignedTasks,
+ Map<String, Set<Integer>>
resultTasksPendingRevocation,
+ Map<String, Set<Integer>>
resultTasksPendingAssignment,
+ BiPredicate<String, Integer>
isUnreleasedTask) {
+ boolean hasUnreleasedTasks = false;
+
+ Set<String> allSubtopologyIds = new
HashSet<>(targetAssignment.keySet());
+ allSubtopologyIds.addAll(currentAssignment.keySet());
+
+ for (String subtopologyId : allSubtopologyIds) {
+ hasUnreleasedTasks |= computeAssignmentDifferenceForOneSubtopology(
+ subtopologyId,
+ currentAssignment.getOrDefault(subtopologyId,
Collections.emptySet()),
+ targetAssignment.getOrDefault(subtopologyId,
Collections.emptySet()),
+ resultAssignedTasks,
+ resultTasksPendingRevocation,
+ resultTasksPendingAssignment,
+ isUnreleasedTask
+ );
+ }
+ return hasUnreleasedTasks;
+ }
+
+ private static boolean computeAssignmentDifferenceForOneSubtopology(final
String subtopologyId,
+ final
Set<Integer> currentTasksForThisSubtopology,
+ final
Set<Integer> targetTasksForThisSubtopology,
+ final
Map<String, Set<Integer>> resultAssignedTasks,
+ final
Map<String, Set<Integer>> resultTasksPendingRevocation,
+ final
Map<String, Set<Integer>> resultTasksPendingAssignment,
+ final
BiPredicate<String, Integer> isUnreleasedTask) {
+ // Result Assigned Tasks = Current Tasks ∩ Target Tasks
+ // i.e. we remove all tasks from the current assignment that are not
in the target
+ // assignment
+ Set<Integer> resultAssignedTasksForThisSubtopology = new
HashSet<>(currentTasksForThisSubtopology);
+
resultAssignedTasksForThisSubtopology.retainAll(targetTasksForThisSubtopology);
+
+ // Result Tasks Pending Revocation = Current Tasks - Result Assigned
Tasks
+ // i.e. we will ask the member to revoke all tasks in its current
assignment that
+ // are not in the target assignment
+ Set<Integer> resultTasksPendingRevocationForThisSubtopology = new
HashSet<>(currentTasksForThisSubtopology);
+
resultTasksPendingRevocationForThisSubtopology.removeAll(resultAssignedTasksForThisSubtopology);
+
+ // Result Tasks Pending Assignment = Target Tasks - Result Assigned
Tasks - Unreleased Tasks
+ // i.e. we will ask the member to assign all tasks in its target
assignment,
+ // except those that are already assigned, and those that are
unreleased
+ Set<Integer> resultTasksPendingAssignmentForThisSubtopology = new
HashSet<>(targetTasksForThisSubtopology);
+
resultTasksPendingAssignmentForThisSubtopology.removeAll(resultAssignedTasksForThisSubtopology);
+ boolean hasUnreleasedTasks =
resultTasksPendingAssignmentForThisSubtopology.removeIf(taskId ->
+ isUnreleasedTask.test(subtopologyId, taskId)
+ );
+
+ if (!resultAssignedTasksForThisSubtopology.isEmpty()) {
+ resultAssignedTasks.put(subtopologyId,
resultAssignedTasksForThisSubtopology);
+ }
+
+ if (!resultTasksPendingRevocationForThisSubtopology.isEmpty()) {
+ resultTasksPendingRevocation.put(subtopologyId,
resultTasksPendingRevocationForThisSubtopology);
+ }
+
+ if (!resultTasksPendingAssignmentForThisSubtopology.isEmpty()) {
+ resultTasksPendingAssignment.put(subtopologyId,
resultTasksPendingAssignmentForThisSubtopology);
+ }
+
+ return hasUnreleasedTasks;
+ }
+
+ /**
+ * Computes the next assignment.
+ *
+ * @param memberEpoch The epoch of the member to use. This may be
different from
+ * the epoch in {@link
CurrentAssignmentBuilder#member}.
+ * @param memberAssignedTasks The assigned tasks of the member to use.
+ * @return A new StreamsGroupMember.
+ */
+ private StreamsGroupMember computeNextAssignment(int memberEpoch,
+ TasksTuple
memberAssignedTasks) {
+ Map<String, Set<Integer>> newActiveAssignedTasks = new HashMap<>();
+ Map<String, Set<Integer>> newActiveTasksPendingRevocation = new
HashMap<>();
+ Map<String, Set<Integer>> newActiveTasksPendingAssignment = new
HashMap<>();
+ Map<String, Set<Integer>> newStandbyAssignedTasks = new HashMap<>();
+ Map<String, Set<Integer>> newStandbyTasksPendingRevocation = new
HashMap<>();
+ Map<String, Set<Integer>> newStandbyTasksPendingAssignment = new
HashMap<>();
+ Map<String, Set<Integer>> newWarmupAssignedTasks = new HashMap<>();
+ Map<String, Set<Integer>> newWarmupTasksPendingRevocation = new
HashMap<>();
+ Map<String, Set<Integer>> newWarmupTasksPendingAssignment = new
HashMap<>();
+
+ boolean hasUnreleasedActiveTasks = computeAssignmentDifference(
+ memberAssignedTasks.activeTasks(),
+ targetAssignment.activeTasks(),
+ newActiveAssignedTasks,
+ newActiveTasksPendingRevocation,
+ newActiveTasksPendingAssignment,
+ (subtopologyId, partitionId) ->
+ currentActiveTaskProcessId.apply(subtopologyId, partitionId)
!= null ||
+ currentStandbyTaskProcessIds.apply(subtopologyId,
partitionId)
+ .contains(member.processId()) ||
+ currentWarmupTaskProcessIds.apply(subtopologyId,
partitionId)
+ .contains(member.processId())
+ );
+
+ boolean hasUnreleasedStandbyTasks = computeAssignmentDifference(
+ memberAssignedTasks.standbyTasks(),
+ targetAssignment.standbyTasks(),
+ newStandbyAssignedTasks,
+ newStandbyTasksPendingRevocation,
+ newStandbyTasksPendingAssignment,
+ (subtopologyId, partitionId) ->
+ Objects.equals(currentActiveTaskProcessId.apply(subtopologyId,
partitionId),
+ member.processId()) ||
+ currentStandbyTaskProcessIds.apply(subtopologyId,
partitionId)
+ .contains(member.processId()) ||
+ currentWarmupTaskProcessIds.apply(subtopologyId,
partitionId)
+ .contains(member.processId())
+ );
+
+ boolean hasUnreleasedWarmupTasks = computeAssignmentDifference(
+ memberAssignedTasks.warmupTasks(),
+ targetAssignment.warmupTasks(),
+ newWarmupAssignedTasks,
+ newWarmupTasksPendingRevocation,
+ newWarmupTasksPendingAssignment,
+ (subtopologyId, partitionId) ->
+ Objects.equals(currentActiveTaskProcessId.apply(subtopologyId,
partitionId),
+ member.processId()) ||
+ currentStandbyTaskProcessIds.apply(subtopologyId,
partitionId)
+ .contains(member.processId()) ||
+ currentWarmupTaskProcessIds.apply(subtopologyId,
partitionId)
+ .contains(member.processId())
+ );
+
+ return buildNewMember(
+ memberEpoch,
+ new TasksTuple(
+ newActiveTasksPendingRevocation,
+ newStandbyTasksPendingRevocation,
+ newWarmupTasksPendingRevocation
+ ),
+ new TasksTuple(
+ newActiveAssignedTasks,
+ newStandbyAssignedTasks,
+ newWarmupAssignedTasks
+ ),
+ new TasksTuple(
+ newActiveTasksPendingAssignment,
+ newStandbyTasksPendingAssignment,
+ newWarmupTasksPendingAssignment
+ ),
+ hasUnreleasedActiveTasks || hasUnreleasedStandbyTasks ||
hasUnreleasedWarmupTasks
+ );
+ }
+
+ private StreamsGroupMember buildNewMember(final int memberEpoch,
+ final TasksTuple
newTasksPendingRevocation,
+ final TasksTuple
newAssignedTasks,
+ final TasksTuple
newTasksPendingAssignment,
+ final boolean
hasUnreleasedTasks) {
+
+ final boolean hasTasksToBeRevoked =
+ (!newTasksPendingRevocation.isEmpty())
+ && (ownedTasks.isEmpty() ||
ownedTasks.get().containsAny(newTasksPendingRevocation));
+
+ if (hasTasksToBeRevoked) {
+ // If there are tasks to be revoked, the member remains in its
current
+ // epoch and requests the revocation of those tasks. It
transitions to
+ // the UNREVOKED_TASKS state to wait until the client acknowledges
the
+ // revocation of the tasks.
+ return new StreamsGroupMember.Builder(member)
+ .setState(MemberState.UNREVOKED_TASKS)
+ .updateMemberEpoch(memberEpoch)
+ .setAssignedTasks(newAssignedTasks)
+ .setTasksPendingRevocation(newTasksPendingRevocation)
+ .build();
+ } else if (!newTasksPendingAssignment.isEmpty()) {
+ // If there are tasks to be assigned, the member transitions to the
+ // target epoch and requests the assignment of those tasks. Note
that
+ // the tasks are directly added to the assigned tasks set. The
+ // member transitions to the STABLE state or to the
UNRELEASED_TASKS
+ // state depending on whether there are unreleased tasks or not.
+ MemberState newState =
+ hasUnreleasedTasks
+ ? MemberState.UNRELEASED_TASKS
+ : MemberState.STABLE;
+ return new StreamsGroupMember.Builder(member)
+ .setState(newState)
+ .updateMemberEpoch(targetAssignmentEpoch)
+
.setAssignedTasks(newAssignedTasks.merge(newTasksPendingAssignment))
+ .setTasksPendingRevocation(TasksTuple.EMPTY)
+ .build();
+ } else if (hasUnreleasedTasks) {
+ // If there are no tasks to be revoked nor to be assigned but some
+ // tasks are not available yet, the member transitions to the
target
+ // epoch, to the UNRELEASED_TASKS state and waits.
+ return new StreamsGroupMember.Builder(member)
+ .setState(MemberState.UNRELEASED_TASKS)
+ .updateMemberEpoch(targetAssignmentEpoch)
+ .setAssignedTasks(newAssignedTasks)
+ .setTasksPendingRevocation(TasksTuple.EMPTY)
+ .build();
+ } else {
+ // Otherwise, the member transitions to the target epoch and to the
+ // STABLE state.
+ return new StreamsGroupMember.Builder(member)
+ .setState(MemberState.STABLE)
+ .updateMemberEpoch(targetAssignmentEpoch)
+ .setAssignedTasks(newAssignedTasks)
+ .setTasksPendingRevocation(TasksTuple.EMPTY)
+ .build();
+ }
+ }
+}
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroupMember.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroupMember.java
index e23df3f5701..612e72fabdd 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroupMember.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroupMember.java
@@ -49,18 +49,8 @@ import java.util.stream.Collectors;
* @param userEndpoint The user endpoint exposed for
Interactive Queries by the Streams client that
* contains the member.
* @param clientTags Tags of the client of the member used
for rack-aware assignment.
- * @param assignedActiveTasks Active tasks assigned to the member.
- * The key of the map is the subtopology
ID and the value is the set of partition IDs.
- * @param assignedStandbyTasks Standby tasks assigned to the member.
- * The key of the map is the subtopology
ID and the value is the set of partition IDs.
- * @param assignedWarmupTasks Warm-up tasks assigned to the member.
- * The key of the map is the subtopology
ID and the value is the set of partition IDs.
- * @param activeTasksPendingRevocation Active tasks assigned to the member
pending revocation.
- * The key of the map is the subtopology
ID and the value is the set of partition IDs.
- * @param standbyTasksPendingRevocation Standby tasks assigned to the member
pending revocation.
- * The key of the map is the subtopology
ID and the value is the set of partition IDs.
- * @param warmupTasksPendingRevocation Warm-up tasks assigned to the member
pending revocation.
- * The key of the map is the subtopology
ID and the value is the set of partition IDs.
+ * @param assignedTasks Tasks assigned to the member.
+ * @param tasksPendingRevocation Tasks owned by the member pending
revocation.
*/
@SuppressWarnings("checkstyle:JavaNCSS")
public record StreamsGroupMember(String memberId,
@@ -76,22 +66,12 @@ public record StreamsGroupMember(String memberId,
String processId,
Optional<StreamsGroupMemberMetadataValue.Endpoint> userEndpoint,
Map<String, String> clientTags,
- Map<String, Set<Integer>> assignedActiveTasks,
- Map<String, Set<Integer>>
assignedStandbyTasks,
- Map<String, Set<Integer>> assignedWarmupTasks,
- Map<String, Set<Integer>>
activeTasksPendingRevocation,
- Map<String, Set<Integer>>
standbyTasksPendingRevocation,
- Map<String, Set<Integer>>
warmupTasksPendingRevocation) {
+ TasksTuple assignedTasks,
+ TasksTuple tasksPendingRevocation) {
public StreamsGroupMember {
Objects.requireNonNull(memberId, "memberId cannot be null");
clientTags = clientTags != null ?
Collections.unmodifiableMap(clientTags) : null;
- assignedActiveTasks = assignedActiveTasks != null ?
Collections.unmodifiableMap(assignedActiveTasks) : null;
- assignedStandbyTasks = assignedStandbyTasks != null ?
Collections.unmodifiableMap(assignedStandbyTasks) : null;
- assignedWarmupTasks = assignedWarmupTasks != null ?
Collections.unmodifiableMap(assignedWarmupTasks) : null;
- activeTasksPendingRevocation = activeTasksPendingRevocation != null ?
Collections.unmodifiableMap(activeTasksPendingRevocation) : null;
- standbyTasksPendingRevocation = standbyTasksPendingRevocation != null
? Collections.unmodifiableMap(standbyTasksPendingRevocation) : null;
- warmupTasksPendingRevocation = warmupTasksPendingRevocation != null ?
Collections.unmodifiableMap(warmupTasksPendingRevocation) : null;
}
/**
@@ -114,12 +94,8 @@ public record StreamsGroupMember(String memberId,
private String processId = null;
private Optional<StreamsGroupMemberMetadataValue.Endpoint>
userEndpoint = null;
private Map<String, String> clientTags = null;
- private Map<String, Set<Integer>> assignedActiveTasks = null;
- private Map<String, Set<Integer>> assignedStandbyTasks = null;
- private Map<String, Set<Integer>> assignedWarmupTasks = null;
- private Map<String, Set<Integer>> activeTasksPendingRevocation = null;
- private Map<String, Set<Integer>> standbyTasksPendingRevocation = null;
- private Map<String, Set<Integer>> warmupTasksPendingRevocation = null;
+ private TasksTuple assignedTasks = null;
+ private TasksTuple tasksPendingRevocation = null;
public Builder(String memberId) {
this.memberId = Objects.requireNonNull(memberId, "memberId cannot
be null");
@@ -141,12 +117,8 @@ public record StreamsGroupMember(String memberId,
this.userEndpoint = member.userEndpoint;
this.clientTags = member.clientTags;
this.state = member.state;
- this.assignedActiveTasks = member.assignedActiveTasks;
- this.assignedStandbyTasks = member.assignedStandbyTasks;
- this.assignedWarmupTasks = member.assignedWarmupTasks;
- this.activeTasksPendingRevocation =
member.activeTasksPendingRevocation;
- this.standbyTasksPendingRevocation =
member.standbyTasksPendingRevocation;
- this.warmupTasksPendingRevocation =
member.warmupTasksPendingRevocation;
+ this.assignedTasks = member.assignedTasks;
+ this.tasksPendingRevocation = member.tasksPendingRevocation;
}
public Builder updateMemberEpoch(int memberEpoch) {
@@ -251,50 +223,13 @@ public record StreamsGroupMember(String memberId,
return this;
}
- public Builder setAssignment(Assignment assignment) {
- this.assignedActiveTasks = assignment.activeTasks();
- this.assignedStandbyTasks = assignment.standbyTasks();
- this.assignedWarmupTasks = assignment.warmupTasks();
+ public Builder setAssignedTasks(TasksTuple assignedTasks) {
+ this.assignedTasks = assignedTasks;
return this;
}
- public Builder setAssignedActiveTasks(Map<String, Set<Integer>>
assignedActiveTasks) {
- this.assignedActiveTasks = assignedActiveTasks;
- return this;
- }
-
- public Builder setAssignedStandbyTasks(Map<String, Set<Integer>>
assignedStandbyTasks) {
- this.assignedStandbyTasks = assignedStandbyTasks;
- return this;
- }
-
- public Builder setAssignedWarmupTasks(Map<String, Set<Integer>>
assignedWarmupTasks) {
- this.assignedWarmupTasks = assignedWarmupTasks;
- return this;
- }
-
- public Builder setAssignmentPendingRevocation(Assignment assignment) {
- this.activeTasksPendingRevocation = assignment.activeTasks();
- this.standbyTasksPendingRevocation = assignment.standbyTasks();
- this.warmupTasksPendingRevocation = assignment.warmupTasks();
- return this;
- }
-
- public Builder setActiveTasksPendingRevocation(
- Map<String, Set<Integer>> activeTasksPendingRevocation) {
- this.activeTasksPendingRevocation = activeTasksPendingRevocation;
- return this;
- }
-
- public Builder setStandbyTasksPendingRevocation(
- Map<String, Set<Integer>> standbyTasksPendingRevocation) {
- this.standbyTasksPendingRevocation = standbyTasksPendingRevocation;
- return this;
- }
-
- public Builder setWarmupTasksPendingRevocation(
- Map<String, Set<Integer>> warmupTasksPendingRevocation) {
- this.warmupTasksPendingRevocation = warmupTasksPendingRevocation;
+ public Builder setTasksPendingRevocation(TasksTuple
tasksPendingRevocation) {
+ this.tasksPendingRevocation = tasksPendingRevocation;
return this;
}
@@ -318,15 +253,20 @@ public record StreamsGroupMember(String memberId,
setMemberEpoch(record.memberEpoch());
setPreviousMemberEpoch(record.previousMemberEpoch());
setState(MemberState.fromValue(record.state()));
-
setAssignedActiveTasks(assignmentFromTaskIds(record.activeTasks()));
-
setAssignedStandbyTasks(assignmentFromTaskIds(record.standbyTasks()));
-
setAssignedWarmupTasks(assignmentFromTaskIds(record.warmupTasks()));
- setActiveTasksPendingRevocation(
- assignmentFromTaskIds(record.activeTasksPendingRevocation()));
- setStandbyTasksPendingRevocation(
- assignmentFromTaskIds(record.standbyTasksPendingRevocation()));
- setWarmupTasksPendingRevocation(
- assignmentFromTaskIds(record.warmupTasksPendingRevocation()));
+ setAssignedTasks(
+ new TasksTuple(
+ assignmentFromTaskIds(record.activeTasks()),
+ assignmentFromTaskIds(record.standbyTasks()),
+ assignmentFromTaskIds(record.warmupTasks())
+ )
+ );
+ setTasksPendingRevocation(
+ new TasksTuple(
+
assignmentFromTaskIds(record.activeTasksPendingRevocation()),
+
assignmentFromTaskIds(record.standbyTasksPendingRevocation()),
+
assignmentFromTaskIds(record.warmupTasksPendingRevocation())
+ )
+ );
return this;
}
@@ -353,12 +293,8 @@ public record StreamsGroupMember(String memberId,
processId,
userEndpoint,
clientTags,
- assignedActiveTasks,
- assignedStandbyTasks,
- assignedWarmupTasks,
- activeTasksPendingRevocation,
- standbyTasksPendingRevocation,
- warmupTasksPendingRevocation
+ assignedTasks,
+ tasksPendingRevocation
);
}
}
@@ -377,9 +313,7 @@ public record StreamsGroupMember(String memberId,
*
* @return The StreamsGroupMember mapped as
StreamsGroupDescribeResponseData.Member.
*/
- public StreamsGroupDescribeResponseData.Member
asStreamsGroupDescribeMember(
- Assignment targetAssignment
- ) {
+ public StreamsGroupDescribeResponseData.Member
asStreamsGroupDescribeMember(TasksTuple targetAssignment) {
final StreamsGroupDescribeResponseData.Assignment
describedTargetAssignment =
new StreamsGroupDescribeResponseData.Assignment();
@@ -395,9 +329,9 @@ public record StreamsGroupMember(String memberId,
.setMemberId(memberId)
.setAssignment(
new StreamsGroupDescribeResponseData.Assignment()
- .setActiveTasks(taskIdsFromMap(assignedActiveTasks))
- .setStandbyTasks(taskIdsFromMap(assignedStandbyTasks))
- .setWarmupTasks(taskIdsFromMap(assignedWarmupTasks)))
+
.setActiveTasks(taskIdsFromMap(assignedTasks.activeTasks()))
+
.setStandbyTasks(taskIdsFromMap(assignedTasks.standbyTasks()))
+
.setWarmupTasks(taskIdsFromMap(assignedTasks.warmupTasks())))
.setTargetAssignment(describedTargetAssignment)
.setClientHost(clientHost)
.setClientId(clientId)
@@ -419,9 +353,7 @@ public record StreamsGroupMember(String memberId,
);
}
- private static List<StreamsGroupDescribeResponseData.TaskIds>
taskIdsFromMap(
- Map<String, Set<Integer>> tasks
- ) {
+ private static List<StreamsGroupDescribeResponseData.TaskIds>
taskIdsFromMap(Map<String, Set<Integer>> tasks) {
List<StreamsGroupDescribeResponseData.TaskIds> taskIds = new
ArrayList<>();
tasks.forEach((subtopologyId, partitionSet) -> {
taskIds.add(new StreamsGroupDescribeResponseData.TaskIds()
@@ -432,32 +364,9 @@ public record StreamsGroupMember(String memberId,
}
/**
- * @return True if the two provided members have different assigned active
tasks.
- */
- public static boolean hasAssignedActiveTasksChanged(
- StreamsGroupMember member1,
- StreamsGroupMember member2
- ) {
- return
!member1.assignedActiveTasks().equals(member2.assignedActiveTasks());
- }
-
- /**
- * @return True if the two provided members have different assigned active
tasks.
- */
- public static boolean hasAssignedStandbyTasksChanged(
- StreamsGroupMember member1,
- StreamsGroupMember member2
- ) {
- return
!member1.assignedStandbyTasks().equals(member2.assignedStandbyTasks());
- }
-
- /**
- * @return True if the two provided members have different assigned active
tasks.
+ * @return True if the two provided members have different assigned tasks.
*/
- public static boolean hasAssignedWarmupTasksChanged(
- StreamsGroupMember member1,
- StreamsGroupMember member2
- ) {
- return
!member1.assignedWarmupTasks().equals(member2.assignedWarmupTasks());
+ public static boolean hasAssignedTasksChanged(StreamsGroupMember member1,
StreamsGroupMember member2) {
+ return !member1.assignedTasks().equals(member2.assignedTasks());
}
}
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/Assignment.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/TasksTuple.java
similarity index 50%
rename from
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/Assignment.java
rename to
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/TasksTuple.java
index da377d19ccd..ea4c6f81a52 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/Assignment.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/TasksTuple.java
@@ -19,6 +19,7 @@ package org.apache.kafka.coordinator.group.streams;
import
org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMemberValue;
import java.util.Collections;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
@@ -26,45 +27,89 @@ import java.util.Set;
import java.util.stream.Collectors;
/**
- * An immutable assignment for a member.
+ * An immutable tuple containing active, standby and warm-up tasks.
*
- * @param activeTasks Active tasks assigned to the member.
+ * @param activeTasks Active tasks.
* The key of the map is the subtopology ID and
the value is the set of partition IDs.
- * @param standbyTasks Standby tasks assigned to the member.
+ * @param standbyTasks Standby tasks.
* The key of the map is the subtopology ID and
the value is the set of partition IDs.
- * @param warmupTasks Warm-up tasks assigned to the member.
+ * @param warmupTasks Warm-up tasks.
* The key of the map is the subtopology ID and
the value is the set of partition IDs.
*/
-public record Assignment(Map<String, Set<Integer>> activeTasks,
+public record TasksTuple(Map<String, Set<Integer>> activeTasks,
Map<String, Set<Integer>> standbyTasks,
Map<String, Set<Integer>> warmupTasks) {
- public Assignment {
+ public TasksTuple {
activeTasks =
Collections.unmodifiableMap(Objects.requireNonNull(activeTasks));
standbyTasks =
Collections.unmodifiableMap(Objects.requireNonNull(standbyTasks));
warmupTasks =
Collections.unmodifiableMap(Objects.requireNonNull(warmupTasks));
}
/**
- * An empty assignment.
+ * An empty task tuple.
*/
- public static final Assignment EMPTY = new Assignment(
+ public static final TasksTuple EMPTY = new TasksTuple(
Collections.emptyMap(),
Collections.emptyMap(),
Collections.emptyMap()
);
/**
- * Creates a {{@link
org.apache.kafka.coordinator.group.streams.Assignment}} from a
+ * @return true if all collections in the tuple are empty.
+ */
+ public boolean isEmpty() {
+ return activeTasks.isEmpty() && standbyTasks.isEmpty() &&
warmupTasks.isEmpty();
+ }
+
+ /**
+ * Merges this task tuple with another task tuple.
+ *
+ * @param other The other task tuple.
+ * @return A new task tuple, containing all active tasks, standby tasks
and warm-up tasks from both tuples.
+ */
+ public TasksTuple merge(TasksTuple other) {
+ Map<String, Set<Integer>> mergedActiveTasks = merge(activeTasks,
other.activeTasks);
+ Map<String, Set<Integer>> mergedStandbyTasks = merge(standbyTasks,
other.standbyTasks);
+ Map<String, Set<Integer>> mergedWarmupTasks = merge(warmupTasks,
other.warmupTasks);
+ return new TasksTuple(mergedActiveTasks, mergedStandbyTasks,
mergedWarmupTasks);
+ }
+
+ private static Map<String, Set<Integer>> merge(final Map<String,
Set<Integer>> tasks1, final Map<String, Set<Integer>> tasks2) {
+ HashMap<String, Set<Integer>> result = new HashMap<>();
+ tasks1.forEach((subtopologyId, tasks) ->
+ result.put(subtopologyId, new HashSet<>(tasks)));
+ tasks2.forEach((subtopologyId, tasks) -> result
+ .computeIfAbsent(subtopologyId, __ -> new HashSet<>())
+ .addAll(tasks));
+ return result;
+ }
+
+ /**
+ * Checks if this task tuple contains any of the tasks in another task
tuple.
+ *
+ * @param other The other task tuple.
+ * @return true if there is at least one active, standby or warm-up task
that is present in both tuples.
+ */
+ public boolean containsAny(TasksTuple other) {
+ return activeTasks.entrySet().stream().anyMatch(
+ entry -> other.activeTasks.containsKey(entry.getKey()) &&
!Collections.disjoint(entry.getValue(), other.activeTasks.get(entry.getKey()))
+ ) || standbyTasks.entrySet().stream().anyMatch(
+ entry -> other.standbyTasks.containsKey(entry.getKey()) &&
!Collections.disjoint(entry.getValue(), other.standbyTasks.get(entry.getKey()))
+ ) || warmupTasks.entrySet().stream().anyMatch(
+ entry -> other.warmupTasks.containsKey(entry.getKey()) &&
!Collections.disjoint(entry.getValue(), other.warmupTasks.get(entry.getKey()))
+ );
+ }
+
+ /**
+ * Creates a {{@link TasksTuple}} from a
* {{@link
org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMemberValue}}.
*
* @param record The record.
- * @return A {{@link
org.apache.kafka.coordinator.group.streams.Assignment}}.
+ * @return A {{@link TasksTuple}}.
*/
- public static Assignment fromRecord(
- StreamsGroupTargetAssignmentMemberValue record
- ) {
- return new Assignment(
+ public static TasksTuple
fromTargetAssignmentRecord(StreamsGroupTargetAssignmentMemberValue record) {
+ return new TasksTuple(
record.activeTasks().stream()
.collect(Collectors.toMap(
StreamsGroupTargetAssignmentMemberValue.TaskIds::subtopologyId,
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/GroupAssignment.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/GroupAssignment.java
index a97cdc33b79..c4ac5803b35 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/GroupAssignment.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/GroupAssignment.java
@@ -22,7 +22,7 @@ import java.util.Objects;
/**
* The task assignment for a streams group.
*
- * @param members The member assignments keyed by member id.
+ * @param members The member assignments keyed by member ID.
*/
public record GroupAssignment(Map<String, MemberAssignment> members) {
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/CurrentAssignmentBuilderTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/CurrentAssignmentBuilderTest.java
new file mode 100644
index 00000000000..4d55a8419fb
--- /dev/null
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/CurrentAssignmentBuilderTest.java
@@ -0,0 +1,825 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.streams;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.FencedMemberEpochException;
+import
org.apache.kafka.coordinator.group.streams.TaskAssignmentTestUtil.TaskRole;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
+
+import java.util.Collections;
+
+import static
org.apache.kafka.coordinator.group.streams.TaskAssignmentTestUtil.mkTasks;
+import static
org.apache.kafka.coordinator.group.streams.TaskAssignmentTestUtil.mkTasksTuple;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+public class CurrentAssignmentBuilderTest {
+
+ private static final String SUBTOPOLOGY_ID1 = Uuid.randomUuid().toString();
+ private static final String SUBTOPOLOGY_ID2 = Uuid.randomUuid().toString();
+ private static final String PROCESS_ID = "process_id";
+ private static final String MEMBER_NAME = "member";
+
+ @ParameterizedTest
+ @EnumSource(TaskRole.class)
+ public void testStableToStable(TaskRole taskRole) {
+ final int memberEpoch = 10;
+
+ StreamsGroupMember member =
+ new StreamsGroupMember.Builder(MEMBER_NAME)
+ .setState(MemberState.STABLE)
+ .setProcessId(PROCESS_ID)
+ .setMemberEpoch(memberEpoch)
+ .setPreviousMemberEpoch(memberEpoch)
+ .setAssignedTasks(
+ mkTasksTuple(
+ taskRole,
+ mkTasks(SUBTOPOLOGY_ID1, 1, 2),
+ mkTasks(SUBTOPOLOGY_ID2, 3, 4)))
+ .setTasksPendingRevocation(TasksTuple.EMPTY)
+ .build();
+
+ StreamsGroupMember updatedMember = new CurrentAssignmentBuilder(member)
+ .withTargetAssignment(memberEpoch + 1, mkTasksTuple(taskRole,
+ mkTasks(SUBTOPOLOGY_ID1, 1, 2),
+ mkTasks(SUBTOPOLOGY_ID2, 3, 4)))
+ .withCurrentActiveTaskProcessId((subtopologyId, partitionId) ->
PROCESS_ID)
+ .withCurrentStandbyTaskProcessIds(
+ (subtopologyId, partitionId) -> Collections.emptySet())
+ .withCurrentWarmupTaskProcessIds((subtopologyId, partitionId) ->
Collections.emptySet())
+ .build();
+
+ assertEquals(
+ new StreamsGroupMember.Builder(MEMBER_NAME)
+ .setState(MemberState.STABLE)
+ .setProcessId(PROCESS_ID)
+ .setMemberEpoch(memberEpoch + 1)
+ .setPreviousMemberEpoch(memberEpoch)
+ .setAssignedTasks(mkTasksTuple(
+ taskRole,
+ mkTasks(SUBTOPOLOGY_ID1, 1, 2),
+ mkTasks(SUBTOPOLOGY_ID2, 3, 4)))
+ .setTasksPendingRevocation(TasksTuple.EMPTY)
+ .build(),
+ updatedMember
+ );
+ }
+
+ @ParameterizedTest
+ @EnumSource(TaskRole.class)
+ public void testStableToStableAtTargetEpoch(TaskRole taskRole) {
+ final int memberEpoch = 10;
+
+ StreamsGroupMember member =
+ new StreamsGroupMember.Builder(MEMBER_NAME)
+ .setState(MemberState.STABLE)
+ .setProcessId(PROCESS_ID)
+ .setMemberEpoch(memberEpoch)
+ .setPreviousMemberEpoch(memberEpoch)
+ .setAssignedTasks(
+ mkTasksTuple(
+ taskRole,
+ mkTasks(SUBTOPOLOGY_ID1, 1, 2),
+ mkTasks(SUBTOPOLOGY_ID2, 3, 4)))
+ .setTasksPendingRevocation(TasksTuple.EMPTY)
+ .build();
+
+ StreamsGroupMember updatedMember = new CurrentAssignmentBuilder(member)
+ .withTargetAssignment(memberEpoch, mkTasksTuple(taskRole,
+ mkTasks(SUBTOPOLOGY_ID1, 1, 2),
+ mkTasks(SUBTOPOLOGY_ID2, 3, 4)))
+ .withCurrentActiveTaskProcessId((subtopologyId, partitionId) ->
PROCESS_ID)
+ .withCurrentStandbyTaskProcessIds(
+ (subtopologyId, partitionId) -> Collections.emptySet())
+ .withCurrentWarmupTaskProcessIds((subtopologyId, partitionId) ->
Collections.emptySet())
+ .build();
+
+ assertEquals(
+ new StreamsGroupMember.Builder(MEMBER_NAME)
+ .setState(MemberState.STABLE)
+ .setProcessId(PROCESS_ID)
+ .setMemberEpoch(memberEpoch)
+ .setPreviousMemberEpoch(memberEpoch)
+ .setAssignedTasks(mkTasksTuple(
+ taskRole,
+ mkTasks(SUBTOPOLOGY_ID1, 1, 2),
+ mkTasks(SUBTOPOLOGY_ID2, 3, 4)))
+ .setTasksPendingRevocation(TasksTuple.EMPTY)
+ .build(),
+ updatedMember
+ );
+ }
+
+ @ParameterizedTest
+ @EnumSource(TaskRole.class)
+ public void testStableToStableWithNewTasks(TaskRole taskRole) {
+ final int memberEpoch = 10;
+
+ StreamsGroupMember member = new StreamsGroupMember.Builder(MEMBER_NAME)
+ .setState(MemberState.STABLE)
+ .setProcessId(PROCESS_ID)
+ .setMemberEpoch(memberEpoch)
+ .setPreviousMemberEpoch(memberEpoch)
+ .setAssignedTasks(mkTasksTuple(taskRole,
+ mkTasks(SUBTOPOLOGY_ID1, 1, 2),
+ mkTasks(SUBTOPOLOGY_ID2, 3, 4)))
+ .setTasksPendingRevocation(TasksTuple.EMPTY)
+ .build();
+
+ StreamsGroupMember updatedMember = new CurrentAssignmentBuilder(member)
+ .withTargetAssignment(memberEpoch + 1, mkTasksTuple(taskRole,
+ mkTasks(SUBTOPOLOGY_ID1, 1, 2, 4),
+ mkTasks(SUBTOPOLOGY_ID2, 3, 4, 7)))
+ .withCurrentActiveTaskProcessId((subtopologyId, partitionId) ->
null)
+ .withCurrentStandbyTaskProcessIds(
+ (subtopologyId, partitionId) -> Collections.emptySet())
+ .withCurrentWarmupTaskProcessIds((subtopologyId, partitionId) ->
Collections.emptySet())
+ .build();
+
+ assertEquals(
+ new StreamsGroupMember.Builder(MEMBER_NAME)
+ .setState(MemberState.STABLE)
+ .setProcessId(PROCESS_ID)
+ .setMemberEpoch(memberEpoch + 1)
+ .setPreviousMemberEpoch(memberEpoch)
+ .setAssignedTasks(mkTasksTuple(taskRole,
+ mkTasks(SUBTOPOLOGY_ID1, 1, 2, 4),
+ mkTasks(SUBTOPOLOGY_ID2, 3, 4, 7)))
+ .setTasksPendingRevocation(TasksTuple.EMPTY)
+ .build(),
+ updatedMember
+ );
+ }
+
+ @ParameterizedTest
+ @EnumSource(TaskRole.class)
+ public void testStableToUnrevokedTasks(TaskRole taskRole) {
+ final int memberEpoch = 10;
+
+ StreamsGroupMember member = new StreamsGroupMember.Builder(MEMBER_NAME)
+ .setState(MemberState.STABLE)
+ .setProcessId(PROCESS_ID)
+ .setMemberEpoch(memberEpoch)
+ .setPreviousMemberEpoch(memberEpoch)
+ .setAssignedTasks(mkTasksTuple(taskRole,
+ mkTasks(SUBTOPOLOGY_ID1, 1, 2),
+ mkTasks(SUBTOPOLOGY_ID2, 3, 4)))
+ .setTasksPendingRevocation(TasksTuple.EMPTY)
+ .build();
+
+ StreamsGroupMember updatedMember = new CurrentAssignmentBuilder(member)
+ .withTargetAssignment(memberEpoch + 1, mkTasksTuple(taskRole,
+ mkTasks(SUBTOPOLOGY_ID1, 2, 3),
+ mkTasks(SUBTOPOLOGY_ID2, 4, 5)))
+ .withCurrentActiveTaskProcessId((subtopologyId, partitionId) ->
null)
+ .withCurrentStandbyTaskProcessIds(
+ (subtopologyId, partitionId) -> Collections.emptySet())
+ .withCurrentWarmupTaskProcessIds((subtopologyId, partitionId) ->
Collections.emptySet())
+ .build();
+
+ assertEquals(
+ new StreamsGroupMember.Builder(MEMBER_NAME)
+ .setState(MemberState.UNREVOKED_TASKS)
+ .setProcessId(PROCESS_ID)
+ .setMemberEpoch(memberEpoch)
+ .setPreviousMemberEpoch(memberEpoch)
+ .setAssignedTasks(mkTasksTuple(taskRole,
+ mkTasks(SUBTOPOLOGY_ID1, 2),
+ mkTasks(SUBTOPOLOGY_ID2, 4)))
+ .setTasksPendingRevocation(mkTasksTuple(taskRole,
+ mkTasks(SUBTOPOLOGY_ID1, 1),
+ mkTasks(SUBTOPOLOGY_ID2, 3)))
+ .build(),
+ updatedMember
+ );
+ }
+
+ @ParameterizedTest
+ @EnumSource(TaskRole.class)
+ public void testStableToUnrevokedWithEmptyAssignment(TaskRole taskRole) {
+ final int memberEpoch = 10;
+
+ StreamsGroupMember member =
+ new StreamsGroupMember.Builder(MEMBER_NAME)
+ .setState(MemberState.STABLE)
+ .setProcessId(PROCESS_ID)
+ .setMemberEpoch(memberEpoch)
+ .setPreviousMemberEpoch(memberEpoch)
+ .setAssignedTasks(
+ mkTasksTuple(
+ taskRole,
+ mkTasks(SUBTOPOLOGY_ID1, 1, 2),
+ mkTasks(SUBTOPOLOGY_ID2, 3, 4)))
+ .setTasksPendingRevocation(TasksTuple.EMPTY)
+ .build();
+
+ StreamsGroupMember updatedMember = new CurrentAssignmentBuilder(member)
+ .withTargetAssignment(memberEpoch + 1, TasksTuple.EMPTY)
+ .withCurrentActiveTaskProcessId((subtopologyId, partitionId) ->
PROCESS_ID)
+ .withCurrentStandbyTaskProcessIds(
+ (subtopologyId, partitionId) -> Collections.emptySet())
+ .withCurrentWarmupTaskProcessIds((subtopologyId, partitionId) ->
Collections.emptySet())
+ .build();
+
+ assertEquals(
+ new StreamsGroupMember.Builder(MEMBER_NAME)
+ .setState(MemberState.UNREVOKED_TASKS)
+ .setProcessId(PROCESS_ID)
+ .setMemberEpoch(memberEpoch)
+ .setPreviousMemberEpoch(memberEpoch)
+ .setAssignedTasks(TasksTuple.EMPTY)
+ .setTasksPendingRevocation(
+ mkTasksTuple(
+ taskRole,
+ mkTasks(SUBTOPOLOGY_ID1, 1, 2),
+ mkTasks(SUBTOPOLOGY_ID2, 3, 4)))
+ .build(),
+ updatedMember
+ );
+ }
+
+ @ParameterizedTest
+ @EnumSource(TaskRole.class)
+ public void testStableToUnreleasedTasks(TaskRole taskRole) {
+ final int memberEpoch = 10;
+
+ StreamsGroupMember member = new StreamsGroupMember.Builder(MEMBER_NAME)
+ .setState(MemberState.STABLE)
+ .setProcessId(PROCESS_ID)
+ .setMemberEpoch(memberEpoch)
+ .setPreviousMemberEpoch(memberEpoch)
+ .setAssignedTasks(mkTasksTuple(taskRole,
+ mkTasks(SUBTOPOLOGY_ID1, 1, 2),
+ mkTasks(SUBTOPOLOGY_ID2, 3, 4)))
+ .setTasksPendingRevocation(TasksTuple.EMPTY)
+ .build();
+
+ StreamsGroupMember updatedMember = new CurrentAssignmentBuilder(member)
+ .withTargetAssignment(memberEpoch + 1, mkTasksTuple(taskRole,
+ mkTasks(SUBTOPOLOGY_ID1, 1, 2, 4),
+ mkTasks(SUBTOPOLOGY_ID2, 3, 4, 7)))
+ .withCurrentActiveTaskProcessId((subtopologyId, partitionId) ->
PROCESS_ID)
+ .withCurrentStandbyTaskProcessIds(
+ (subtopologyId, partitionId) -> Collections.emptySet())
+ .withCurrentWarmupTaskProcessIds((subtopologyId, partitionId) ->
Collections.emptySet())
+ .build();
+
+ assertEquals(
+ new StreamsGroupMember.Builder(MEMBER_NAME)
+ .setState(MemberState.UNRELEASED_TASKS)
+ .setProcessId(PROCESS_ID)
+ .setMemberEpoch(memberEpoch + 1)
+ .setPreviousMemberEpoch(memberEpoch)
+ .setAssignedTasks(mkTasksTuple(taskRole,
+ mkTasks(SUBTOPOLOGY_ID1, 1, 2),
+ mkTasks(SUBTOPOLOGY_ID2, 3, 4)))
+ .setTasksPendingRevocation(TasksTuple.EMPTY)
+ .build(),
+ updatedMember
+ );
+ }
+
+ @ParameterizedTest
+ @EnumSource(TaskRole.class)
+ public void
testStableToUnreleasedTasksWithOwnedTasksNotHavingRevokedTasks(TaskRole
taskRole) {
+ final int memberEpoch = 10;
+
+ StreamsGroupMember member = new StreamsGroupMember.Builder(MEMBER_NAME)
+ .setState(MemberState.STABLE)
+ .setProcessId(PROCESS_ID)
+ .setMemberEpoch(memberEpoch)
+ .setPreviousMemberEpoch(memberEpoch)
+ .setAssignedTasks(mkTasksTuple(taskRole,
+ mkTasks(SUBTOPOLOGY_ID1, 1, 2),
+ mkTasks(SUBTOPOLOGY_ID2, 3, 4)))
+ .setTasksPendingRevocation(TasksTuple.EMPTY)
+ .build();
+
+ StreamsGroupMember updatedMember = new CurrentAssignmentBuilder(member)
+ .withTargetAssignment(memberEpoch + 1, mkTasksTuple(taskRole,
+ mkTasks(SUBTOPOLOGY_ID1, 1, 2),
+ mkTasks(SUBTOPOLOGY_ID2, 3, 5)))
+ .withCurrentActiveTaskProcessId((subtopologyId, __) ->
+ SUBTOPOLOGY_ID2.equals(subtopologyId) ? PROCESS_ID : null
+ )
+ .withCurrentStandbyTaskProcessIds(
+ (subtopologyId, partitionId) -> Collections.emptySet())
+ .withCurrentWarmupTaskProcessIds((subtopologyId, partitionId) ->
Collections.emptySet())
+ .withOwnedAssignment(mkTasksTuple(taskRole))
+ .build();
+
+ assertEquals(
+ new StreamsGroupMember.Builder(MEMBER_NAME)
+ .setState(MemberState.UNRELEASED_TASKS)
+ .setProcessId(PROCESS_ID)
+ .setMemberEpoch(memberEpoch + 1)
+ .setPreviousMemberEpoch(memberEpoch)
+ .setAssignedTasks(mkTasksTuple(taskRole,
+ mkTasks(SUBTOPOLOGY_ID1, 1, 2),
+ mkTasks(SUBTOPOLOGY_ID2, 3)))
+ .setTasksPendingRevocation(TasksTuple.EMPTY)
+ .build(),
+ updatedMember
+ );
+ }
+
+ @ParameterizedTest
+ @EnumSource(TaskRole.class)
+ public void testUnrevokedTasksToStable(TaskRole taskRole) {
+ final int memberEpoch = 10;
+
+ StreamsGroupMember member = new StreamsGroupMember.Builder(MEMBER_NAME)
+ .setState(MemberState.UNREVOKED_TASKS)
+ .setProcessId(PROCESS_ID)
+ .setMemberEpoch(memberEpoch)
+ .setPreviousMemberEpoch(memberEpoch)
+ .setAssignedTasks(mkTasksTuple(taskRole,
+ mkTasks(SUBTOPOLOGY_ID1, 2, 3),
+ mkTasks(SUBTOPOLOGY_ID2, 5, 6)))
+ .setTasksPendingRevocation(mkTasksTuple(taskRole,
+ mkTasks(SUBTOPOLOGY_ID1, 1),
+ mkTasks(SUBTOPOLOGY_ID2, 4)))
+ .build();
+
+ StreamsGroupMember updatedMember = new CurrentAssignmentBuilder(member)
+ .withTargetAssignment(memberEpoch + 1, mkTasksTuple(taskRole,
+ mkTasks(SUBTOPOLOGY_ID1, 2, 3),
+ mkTasks(SUBTOPOLOGY_ID2, 5, 6)))
+ .withCurrentActiveTaskProcessId((subtopologyId, partitionId) ->
null)
+ .withCurrentStandbyTaskProcessIds(
+ (subtopologyId, partitionId) -> Collections.emptySet())
+ .withCurrentWarmupTaskProcessIds((subtopologyId, partitionId) ->
Collections.emptySet())
+ .withOwnedAssignment(mkTasksTuple(taskRole,
+ mkTasks(SUBTOPOLOGY_ID1, 2, 3),
+ mkTasks(SUBTOPOLOGY_ID2, 5, 6)))
+ .build();
+
+ assertEquals(
+ new StreamsGroupMember.Builder(MEMBER_NAME)
+ .setState(MemberState.STABLE)
+ .setProcessId(PROCESS_ID)
+ .setMemberEpoch(memberEpoch + 1)
+ .setPreviousMemberEpoch(memberEpoch)
+ .setAssignedTasks(mkTasksTuple(taskRole,
+ mkTasks(SUBTOPOLOGY_ID1, 2, 3),
+ mkTasks(SUBTOPOLOGY_ID2, 5, 6)))
+ .setTasksPendingRevocation(TasksTuple.EMPTY)
+ .build(),
+ updatedMember
+ );
+ }
+
+ @ParameterizedTest
+ @EnumSource(TaskRole.class)
+ public void testRemainsInUnrevokedTasks(TaskRole taskRole) {
+ final int memberEpoch = 10;
+
+ StreamsGroupMember member = new StreamsGroupMember.Builder(MEMBER_NAME)
+ .setState(MemberState.UNREVOKED_TASKS)
+ .setProcessId(PROCESS_ID)
+ .setMemberEpoch(memberEpoch)
+ .setPreviousMemberEpoch(memberEpoch)
+ .setAssignedTasks(mkTasksTuple(taskRole,
+ mkTasks(SUBTOPOLOGY_ID1, 2, 3),
+ mkTasks(SUBTOPOLOGY_ID2, 5, 6)))
+ .setTasksPendingRevocation(mkTasksTuple(taskRole,
+ mkTasks(SUBTOPOLOGY_ID1, 1),
+ mkTasks(SUBTOPOLOGY_ID2, 4)))
+ .build();
+
+ CurrentAssignmentBuilder currentAssignmentBuilder = new
CurrentAssignmentBuilder(
+ member)
+ .withTargetAssignment(memberEpoch + 2, mkTasksTuple(taskRole,
+ mkTasks(SUBTOPOLOGY_ID1, 3),
+ mkTasks(SUBTOPOLOGY_ID2, 6)))
+ .withCurrentActiveTaskProcessId((subtopologyId, partitionId) ->
null)
+ .withCurrentStandbyTaskProcessIds(
+ (subtopologyId, partitionId) -> Collections.emptySet())
+ .withCurrentWarmupTaskProcessIds(
+ (subtopologyId, partitionId) -> Collections.emptySet());
+
+ assertEquals(
+ member,
+ currentAssignmentBuilder
+ .withOwnedAssignment(null)
+ .build()
+ );
+
+ assertEquals(
+ member,
+ currentAssignmentBuilder
+ .withOwnedAssignment(mkTasksTuple(taskRole,
+ mkTasks(SUBTOPOLOGY_ID1, 1, 2, 3),
+ mkTasks(SUBTOPOLOGY_ID2, 5, 6)))
+ .build()
+ );
+
+ assertEquals(
+ member,
+ currentAssignmentBuilder
+ .withOwnedAssignment(mkTasksTuple(taskRole,
+ mkTasks(SUBTOPOLOGY_ID1, 2, 3),
+ mkTasks(SUBTOPOLOGY_ID2, 4, 5, 6)))
+ .build()
+ );
+ }
+
+ @ParameterizedTest
+ @EnumSource(TaskRole.class)
+ public void testUnrevokedTasksToUnrevokedTasks(TaskRole taskRole) {
+ final int memberEpoch = 10;
+
+ StreamsGroupMember member = new StreamsGroupMember.Builder(MEMBER_NAME)
+ .setState(MemberState.UNREVOKED_TASKS)
+ .setProcessId(PROCESS_ID)
+ .setMemberEpoch(memberEpoch)
+ .setPreviousMemberEpoch(memberEpoch)
+ .setAssignedTasks(mkTasksTuple(taskRole,
+ mkTasks(SUBTOPOLOGY_ID1, 2, 3),
+ mkTasks(SUBTOPOLOGY_ID2, 5, 6)))
+ .setTasksPendingRevocation(mkTasksTuple(taskRole,
+ mkTasks(SUBTOPOLOGY_ID1, 1),
+ mkTasks(SUBTOPOLOGY_ID2, 4)))
+ .build();
+
+ StreamsGroupMember updatedMember = new CurrentAssignmentBuilder(member)
+ .withTargetAssignment(memberEpoch + 2, mkTasksTuple(taskRole,
+ mkTasks(SUBTOPOLOGY_ID1, 3),
+ mkTasks(SUBTOPOLOGY_ID2, 6)))
+ .withCurrentActiveTaskProcessId((subtopologyId, partitionId) ->
null)
+ .withOwnedAssignment(mkTasksTuple(taskRole,
+ mkTasks(SUBTOPOLOGY_ID1, 2, 3),
+ mkTasks(SUBTOPOLOGY_ID2, 5, 6)))
+ .build();
+
+ assertEquals(
+ new StreamsGroupMember.Builder(MEMBER_NAME)
+ .setState(MemberState.UNREVOKED_TASKS)
+ .setProcessId(PROCESS_ID)
+ .setMemberEpoch(memberEpoch + 1)
+ .setPreviousMemberEpoch(memberEpoch)
+ .setAssignedTasks(mkTasksTuple(taskRole,
+ mkTasks(SUBTOPOLOGY_ID1, 3),
+ mkTasks(SUBTOPOLOGY_ID2, 6)))
+ .setTasksPendingRevocation(mkTasksTuple(taskRole,
+ mkTasks(SUBTOPOLOGY_ID1, 2),
+ mkTasks(SUBTOPOLOGY_ID2, 5)))
+ .build(),
+ updatedMember
+ );
+ }
+
+ @ParameterizedTest
+ @EnumSource(TaskRole.class)
+ public void testUnrevokedTasksToUnreleasedTasks(TaskRole taskRole) {
+ final int memberEpoch = 11;
+
+ StreamsGroupMember member = new StreamsGroupMember.Builder(MEMBER_NAME)
+ .setState(MemberState.UNREVOKED_TASKS)
+ .setProcessId(PROCESS_ID)
+ .setMemberEpoch(memberEpoch)
+ .setPreviousMemberEpoch(memberEpoch - 1)
+ .setAssignedTasks(mkTasksTuple(taskRole,
+ mkTasks(SUBTOPOLOGY_ID1, 2, 3),
+ mkTasks(SUBTOPOLOGY_ID2, 5, 6)))
+ .setTasksPendingRevocation(mkTasksTuple(taskRole,
+ mkTasks(SUBTOPOLOGY_ID1, 1),
+ mkTasks(SUBTOPOLOGY_ID2, 4)))
+ .build();
+
+ StreamsGroupMember updatedMember = new CurrentAssignmentBuilder(member)
+ .withTargetAssignment(memberEpoch, mkTasksTuple(taskRole,
+ mkTasks(SUBTOPOLOGY_ID1, 2, 3, 4),
+ mkTasks(SUBTOPOLOGY_ID2, 5, 6, 7)))
+ .withCurrentActiveTaskProcessId((subtopologyId, partitionId) ->
PROCESS_ID)
+ .withCurrentStandbyTaskProcessIds(
+ (subtopologyId, partitionId) -> Collections.emptySet())
+ .withCurrentWarmupTaskProcessIds((subtopologyId, partitionId) ->
Collections.emptySet())
+ .withOwnedAssignment(mkTasksTuple(taskRole,
+ mkTasks(SUBTOPOLOGY_ID1, 2, 3),
+ mkTasks(SUBTOPOLOGY_ID2, 5, 6))
+ )
+ .build();
+
+ assertEquals(
+ new StreamsGroupMember.Builder(MEMBER_NAME)
+ .setState(MemberState.UNRELEASED_TASKS)
+ .setProcessId(PROCESS_ID)
+ .setMemberEpoch(memberEpoch)
+ .setPreviousMemberEpoch(memberEpoch)
+ .setAssignedTasks(mkTasksTuple(taskRole,
+ mkTasks(SUBTOPOLOGY_ID1, 2, 3),
+ mkTasks(SUBTOPOLOGY_ID2, 5, 6)))
+ .setTasksPendingRevocation(TasksTuple.EMPTY)
+ .build(),
+ updatedMember
+ );
+ }
+
+ @ParameterizedTest
+ @EnumSource(TaskRole.class)
+ public void testUnreleasedTasksToStable(TaskRole taskRole) {
+ final int memberEpoch = 11;
+
+ StreamsGroupMember member = new StreamsGroupMember.Builder(MEMBER_NAME)
+ .setState(MemberState.UNRELEASED_TASKS)
+ .setProcessId("process1")
+ .setMemberEpoch(memberEpoch)
+ .setPreviousMemberEpoch(memberEpoch)
+ .setAssignedTasks(mkTasksTuple(taskRole,
+ mkTasks(SUBTOPOLOGY_ID1, 2, 3),
+ mkTasks(SUBTOPOLOGY_ID2, 5, 6)))
+ .setTasksPendingRevocation(TasksTuple.EMPTY)
+ .build();
+
+ StreamsGroupMember updatedMember = new CurrentAssignmentBuilder(member)
+ .withTargetAssignment(memberEpoch + 1, mkTasksTuple(taskRole,
+ mkTasks(SUBTOPOLOGY_ID1, 2, 3),
+ mkTasks(SUBTOPOLOGY_ID2, 5, 6)))
+ .withCurrentActiveTaskProcessId((subtopologyId, partitionId) ->
PROCESS_ID)
+ .withCurrentStandbyTaskProcessIds(
+ (subtopologyId, partitionId) ->
Collections.singleton(PROCESS_ID))
+ .withCurrentWarmupTaskProcessIds((subtopologyId, partitionId) ->
+ Collections.singleton(PROCESS_ID))
+ .build();
+
+ assertEquals(
+ new StreamsGroupMember.Builder(MEMBER_NAME)
+ .setState(MemberState.STABLE)
+ .setProcessId("process1")
+ .setMemberEpoch(memberEpoch + 1)
+ .setPreviousMemberEpoch(memberEpoch)
+ .setAssignedTasks(mkTasksTuple(taskRole,
+ mkTasks(SUBTOPOLOGY_ID1, 2, 3),
+ mkTasks(SUBTOPOLOGY_ID2, 5, 6)))
+ .setTasksPendingRevocation(TasksTuple.EMPTY)
+ .build(),
+ updatedMember
+ );
+ }
+
+ @ParameterizedTest
+ @EnumSource(TaskRole.class)
+ public void testUnreleasedTasksToStableWithNewTasks(TaskRole taskRole) {
+ int memberEpoch = 11;
+
+ StreamsGroupMember member = new StreamsGroupMember.Builder(MEMBER_NAME)
+ .setState(MemberState.UNRELEASED_TASKS)
+ .setProcessId("process1")
+ .setMemberEpoch(memberEpoch)
+ .setPreviousMemberEpoch(memberEpoch)
+ .setAssignedTasks(mkTasksTuple(taskRole,
+ mkTasks(SUBTOPOLOGY_ID1, 2, 3),
+ mkTasks(SUBTOPOLOGY_ID2, 5, 6)))
+ .setTasksPendingRevocation(TasksTuple.EMPTY)
+ .build();
+
+ StreamsGroupMember updatedMember = new CurrentAssignmentBuilder(member)
+ .withTargetAssignment(memberEpoch, mkTasksTuple(taskRole,
+ mkTasks(SUBTOPOLOGY_ID1, 2, 3, 4),
+ mkTasks(SUBTOPOLOGY_ID2, 5, 6, 7)))
+ .withCurrentActiveTaskProcessId((subtopologyId, partitionId) ->
null)
+ .withCurrentStandbyTaskProcessIds(
+ (subtopologyId, partitionId) -> Collections.emptySet())
+ .withCurrentWarmupTaskProcessIds((subtopologyId, partitionId) ->
Collections.emptySet())
+ .build();
+
+ assertEquals(
+ new StreamsGroupMember.Builder(MEMBER_NAME)
+ .setState(MemberState.STABLE)
+ .setProcessId("process1")
+ .setMemberEpoch(memberEpoch)
+ .setPreviousMemberEpoch(memberEpoch)
+ .setAssignedTasks(mkTasksTuple(taskRole,
+ mkTasks(SUBTOPOLOGY_ID1, 2, 3, 4),
+ mkTasks(SUBTOPOLOGY_ID2, 5, 6, 7)))
+ .setTasksPendingRevocation(TasksTuple.EMPTY)
+ .build(),
+ updatedMember
+ );
+ }
+
+ @ParameterizedTest
+ @EnumSource(TaskRole.class)
+ public void testUnreleasedTasksToUnreleasedTasks(TaskRole taskRole) {
+ int memberEpoch = 11;
+
+ StreamsGroupMember member = new StreamsGroupMember.Builder(MEMBER_NAME)
+ .setState(MemberState.UNRELEASED_TASKS)
+ .setProcessId(PROCESS_ID)
+ .setMemberEpoch(memberEpoch)
+ .setPreviousMemberEpoch(memberEpoch)
+ .setAssignedTasks(mkTasksTuple(taskRole,
+ mkTasks(SUBTOPOLOGY_ID1, 2, 3),
+ mkTasks(SUBTOPOLOGY_ID2, 5, 6)))
+ .setTasksPendingRevocation(TasksTuple.EMPTY)
+ .build();
+
+ StreamsGroupMember updatedMember = new CurrentAssignmentBuilder(member)
+ .withTargetAssignment(memberEpoch, mkTasksTuple(taskRole,
+ mkTasks(SUBTOPOLOGY_ID1, 2, 3, 4),
+ mkTasks(SUBTOPOLOGY_ID2, 5, 6, 7)))
+ .withCurrentActiveTaskProcessId((subtopologyId, partitionId) ->
PROCESS_ID)
+ .withCurrentStandbyTaskProcessIds(
+ (subtopologyId, partitionId) ->
Collections.singleton(PROCESS_ID))
+ .withCurrentWarmupTaskProcessIds(
+ (subtopologyId, partitionId) ->
Collections.singleton(PROCESS_ID))
+ .build();
+
+ assertEquals(member, updatedMember);
+ }
+
+ @ParameterizedTest
+ @EnumSource(TaskRole.class)
+ public void
testUnreleasedTasksToUnreleasedTasksOtherUnreleasedTaskRole(TaskRole taskRole) {
+ int memberEpoch = 11;
+
+ // The unreleased task is owned by a task of a different role on the
same process.
+ StreamsGroupMember member = new StreamsGroupMember.Builder(MEMBER_NAME)
+ .setState(MemberState.UNRELEASED_TASKS)
+ .setProcessId(PROCESS_ID)
+ .setMemberEpoch(memberEpoch)
+ .setPreviousMemberEpoch(memberEpoch)
+ .setAssignedTasks(mkTasksTuple(taskRole,
+ mkTasks(SUBTOPOLOGY_ID1, 2, 3),
+ mkTasks(SUBTOPOLOGY_ID2, 5, 6)))
+ .setTasksPendingRevocation(TasksTuple.EMPTY)
+ .build();
+
+ StreamsGroupMember updatedMember = new CurrentAssignmentBuilder(member)
+ .withTargetAssignment(memberEpoch, mkTasksTuple(taskRole,
+ mkTasks(SUBTOPOLOGY_ID1, 2, 3, 4),
+ mkTasks(SUBTOPOLOGY_ID2, 5, 6, 7)))
+ .withCurrentActiveTaskProcessId((subtopologyId, partitionId) ->
null)
+ .withCurrentStandbyTaskProcessIds(
+ (subtopologyId, partitionId) -> (taskRole == TaskRole.STANDBY)
+ ? Collections.emptySet() :
Collections.singleton(PROCESS_ID))
+ .withCurrentWarmupTaskProcessIds(
+ (subtopologyId, partitionId) -> (taskRole == TaskRole.STANDBY)
+ ? Collections.singleton(PROCESS_ID) :
Collections.emptySet())
+ .build();
+
+ assertEquals(member, updatedMember);
+ }
+
+ @Test
+ public void testUnreleasedTasksToUnreleasedTasksAnyActiveOwner() {
+ int memberEpoch = 11;
+
+ // The unreleased task remains unreleased, because it is owned by any
other instance in
+ // an active role, no matter the process.
+ // The task that is not unreleased can be assigned.
+ StreamsGroupMember member = new StreamsGroupMember.Builder(MEMBER_NAME)
+ .setState(MemberState.UNRELEASED_TASKS)
+ .setProcessId(PROCESS_ID)
+ .setMemberEpoch(memberEpoch)
+ .setPreviousMemberEpoch(memberEpoch)
+ .setAssignedTasks(mkTasksTuple(TaskRole.ACTIVE,
+ mkTasks(SUBTOPOLOGY_ID1, 2, 3),
+ mkTasks(SUBTOPOLOGY_ID2, 5, 6)))
+ .build();
+
+ StreamsGroupMember expectedMember = new
StreamsGroupMember.Builder(MEMBER_NAME)
+ .setState(MemberState.UNRELEASED_TASKS)
+ .setProcessId(PROCESS_ID)
+ .setMemberEpoch(memberEpoch)
+ .setPreviousMemberEpoch(memberEpoch)
+ .setAssignedTasks(mkTasksTuple(TaskRole.ACTIVE,
+ mkTasks(SUBTOPOLOGY_ID1, 2, 3),
+ mkTasks(SUBTOPOLOGY_ID2, 5, 6, 7)))
+ .setTasksPendingRevocation(TasksTuple.EMPTY)
+ .build();
+
+ StreamsGroupMember updatedMember = new CurrentAssignmentBuilder(member)
+ .withTargetAssignment(memberEpoch, mkTasksTuple(TaskRole.ACTIVE,
+ mkTasks(SUBTOPOLOGY_ID1, 2, 3, 4),
+ mkTasks(SUBTOPOLOGY_ID2, 5, 6, 7)))
+ .withCurrentActiveTaskProcessId((subtopologyId, partitionId) ->
+ (subtopologyId.equals(SUBTOPOLOGY_ID1) && partitionId == 4) ?
"anyOtherProcess"
+ : null)
+ .withCurrentStandbyTaskProcessIds(
+ (subtopologyId, partitionId) -> Collections.emptySet())
+ .withCurrentWarmupTaskProcessIds(
+ (subtopologyId, partitionId) -> Collections.emptySet())
+ .build();
+
+ assertEquals(expectedMember, updatedMember);
+ }
+
+ @ParameterizedTest
+ @EnumSource(TaskRole.class)
+ public void testUnreleasedTasksToUnrevokedTasks(TaskRole taskRole) {
+ int memberEpoch = 11;
+
+ StreamsGroupMember member = new StreamsGroupMember.Builder(MEMBER_NAME)
+ .setState(MemberState.UNRELEASED_TASKS)
+ .setProcessId("process1")
+ .setMemberEpoch(memberEpoch)
+ .setPreviousMemberEpoch(memberEpoch)
+ .setAssignedTasks(mkTasksTuple(taskRole,
+ mkTasks(SUBTOPOLOGY_ID1, 2, 3),
+ mkTasks(SUBTOPOLOGY_ID2, 5, 6)))
+ .setTasksPendingRevocation(mkTasksTuple(TaskRole.ACTIVE,
+ mkTasks(SUBTOPOLOGY_ID1, 4),
+ mkTasks(SUBTOPOLOGY_ID2, 7)))
+ .build();
+
+ StreamsGroupMember updatedMember = new CurrentAssignmentBuilder(member)
+ .withTargetAssignment(memberEpoch + 1, mkTasksTuple(taskRole,
+ mkTasks(SUBTOPOLOGY_ID1, 3),
+ mkTasks(SUBTOPOLOGY_ID2, 6)))
+ .withCurrentActiveTaskProcessId((subtopologyId, partitionId) ->
PROCESS_ID)
+ .withCurrentStandbyTaskProcessIds(
+ (subtopologyId, partitionId) -> Collections.emptySet())
+ .withCurrentWarmupTaskProcessIds((subtopologyId, partitionId) ->
Collections.emptySet())
+ .build();
+
+ assertEquals(
+ new StreamsGroupMember.Builder(MEMBER_NAME)
+ .setState(MemberState.UNREVOKED_TASKS)
+ .setProcessId("process1")
+ .setMemberEpoch(memberEpoch)
+ .setPreviousMemberEpoch(memberEpoch)
+ .setAssignedTasks(mkTasksTuple(taskRole,
+ mkTasks(SUBTOPOLOGY_ID1, 3),
+ mkTasks(SUBTOPOLOGY_ID2, 6)))
+ .setTasksPendingRevocation(mkTasksTuple(taskRole,
+ mkTasks(SUBTOPOLOGY_ID1, 2),
+ mkTasks(SUBTOPOLOGY_ID2, 5)))
+ .build(),
+ updatedMember
+ );
+ }
+
+ @ParameterizedTest
+ @EnumSource(TaskRole.class)
+ public void testUnknownState(TaskRole taskRole) {
+ int memberEpoch = 11;
+
+ StreamsGroupMember member = new StreamsGroupMember.Builder(MEMBER_NAME)
+ .setState(MemberState.UNKNOWN)
+ .setMemberEpoch(memberEpoch)
+ .setPreviousMemberEpoch(memberEpoch)
+ .setProcessId(PROCESS_ID)
+ .setAssignedTasks(mkTasksTuple(taskRole,
+ mkTasks(SUBTOPOLOGY_ID1, 3),
+ mkTasks(SUBTOPOLOGY_ID2, 6)))
+ .setTasksPendingRevocation(mkTasksTuple(taskRole,
+ mkTasks(SUBTOPOLOGY_ID1, 2),
+ mkTasks(SUBTOPOLOGY_ID2, 5)))
+ .build();
+
+ // When the member is in an unknown state, the member is first to force
+ // a reset of the client side member state.
+ assertThrows(FencedMemberEpochException.class, () -> new
CurrentAssignmentBuilder(member)
+ .withTargetAssignment(memberEpoch + 1, mkTasksTuple(taskRole,
+ mkTasks(SUBTOPOLOGY_ID1, 3),
+ mkTasks(SUBTOPOLOGY_ID2, 6)))
+ .withCurrentActiveTaskProcessId((subtopologyId, partitionId) ->
PROCESS_ID)
+ .build());
+
+ // Then the member rejoins with no owned tasks.
+ StreamsGroupMember updatedMember = new CurrentAssignmentBuilder(member)
+ .withTargetAssignment(memberEpoch + 1, mkTasksTuple(taskRole,
+ mkTasks(SUBTOPOLOGY_ID1, 3),
+ mkTasks(SUBTOPOLOGY_ID2, 6)))
+ .withCurrentActiveTaskProcessId((subtopologyId, partitionId) ->
PROCESS_ID)
+ .withCurrentStandbyTaskProcessIds(
+ (subtopologyId, partitionId) -> Collections.emptySet())
+ .withCurrentWarmupTaskProcessIds((subtopologyId, partitionId) ->
Collections.emptySet())
+ .withOwnedAssignment(mkTasksTuple(taskRole))
+ .build();
+
+ assertEquals(
+ new StreamsGroupMember.Builder(MEMBER_NAME)
+ .setState(MemberState.STABLE)
+ .setProcessId(PROCESS_ID)
+ .setMemberEpoch(memberEpoch + 1)
+ .setPreviousMemberEpoch(memberEpoch)
+ .setAssignedTasks(mkTasksTuple(taskRole,
+ mkTasks(SUBTOPOLOGY_ID1, 3),
+ mkTasks(SUBTOPOLOGY_ID2, 6)))
+ .setTasksPendingRevocation(TasksTuple.EMPTY)
+ .build(),
+ updatedMember
+ );
+ }
+}
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupMemberTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupMemberTest.java
index 8c6d3d9088a..f6c33df6f13 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupMemberTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupMemberTest.java
@@ -25,14 +25,12 @@ import
org.apache.kafka.coordinator.group.generated.StreamsGroupMemberMetadataVa
import org.junit.jupiter.api.Test;
import java.util.Arrays;
-import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalInt;
-import java.util.Set;
import java.util.stream.Collectors;
import static org.apache.kafka.common.utils.Utils.mkEntry;
@@ -40,8 +38,10 @@ import static org.apache.kafka.common.utils.Utils.mkMap;
import static
org.apache.kafka.coordinator.group.streams.TaskAssignmentTestUtil.mkTasks;
import static
org.apache.kafka.coordinator.group.streams.TaskAssignmentTestUtil.mkTasksPerSubtopology;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
public class StreamsGroupMemberTest {
@@ -70,12 +70,18 @@ public class StreamsGroupMemberTest {
private static final List<Integer> TASKS4 = List.of(3, 2, 1);
private static final List<Integer> TASKS5 = List.of(6, 5, 4);
private static final List<Integer> TASKS6 = List.of(9, 7);
- private static final Map<String, Set<Integer>> ASSIGNED_ACTIVE_TASKS =
mkTasksPerSubtopology(mkTasks(SUBTOPOLOGY1, TASKS1.toArray(Integer[]::new)));
- private static final Map<String, Set<Integer>> ASSIGNED_STANDBY_TASKS =
mkTasksPerSubtopology(mkTasks(SUBTOPOLOGY2, TASKS2.toArray(Integer[]::new)));
- private static final Map<String, Set<Integer>> ASSIGNED_WARMUP_TASKS =
mkTasksPerSubtopology(mkTasks(SUBTOPOLOGY1, TASKS3.toArray(Integer[]::new)));
- private static final Map<String, Set<Integer>>
ACTIVE_TASKS_PENDING_REVOCATION = mkTasksPerSubtopology(mkTasks(SUBTOPOLOGY2,
TASKS4.toArray(Integer[]::new)));
- private static final Map<String, Set<Integer>>
STANDBY_TASKS_PENDING_REVOCATION = mkTasksPerSubtopology(mkTasks(SUBTOPOLOGY1,
TASKS5.toArray(Integer[]::new)));
- private static final Map<String, Set<Integer>>
WARMUP_TASKS_PENDING_REVOCATION = mkTasksPerSubtopology(mkTasks(SUBTOPOLOGY2,
TASKS6.toArray(Integer[]::new)));
+ private static final TasksTuple ASSIGNED_TASKS =
+ new TasksTuple(
+ mkTasksPerSubtopology(mkTasks(SUBTOPOLOGY1,
TASKS1.toArray(Integer[]::new))),
+ mkTasksPerSubtopology(mkTasks(SUBTOPOLOGY2,
TASKS2.toArray(Integer[]::new))),
+ mkTasksPerSubtopology(mkTasks(SUBTOPOLOGY1,
TASKS3.toArray(Integer[]::new)))
+ );
+ private static final TasksTuple TASKS_PENDING_REVOCATION =
+ new TasksTuple(
+ mkTasksPerSubtopology(mkTasks(SUBTOPOLOGY2,
TASKS4.toArray(Integer[]::new))),
+ mkTasksPerSubtopology(mkTasks(SUBTOPOLOGY1,
TASKS5.toArray(Integer[]::new))),
+ mkTasksPerSubtopology(mkTasks(SUBTOPOLOGY2,
TASKS6.toArray(Integer[]::new)))
+ );
@Test
public void testBuilderWithMemberIdIsNull() {
@@ -112,12 +118,8 @@ public class StreamsGroupMemberTest {
assertNull(member.processId());
assertNull(member.userEndpoint());
assertNull(member.clientTags());
- assertNull(member.assignedActiveTasks());
- assertNull(member.assignedStandbyTasks());
- assertNull(member.assignedWarmupTasks());
- assertNull(member.activeTasksPendingRevocation());
- assertNull(member.standbyTasksPendingRevocation());
- assertNull(member.warmupTasksPendingRevocation());
+ assertNull(member.assignedTasks());
+ assertNull(member.tasksPendingRevocation());
}
@Test
@@ -136,12 +138,8 @@ public class StreamsGroupMemberTest {
assertEquals(PROCESS_ID, member.processId());
assertEquals(Optional.of(USER_ENDPOINT), member.userEndpoint());
assertEquals(CLIENT_TAGS, member.clientTags());
- assertEquals(ASSIGNED_ACTIVE_TASKS, member.assignedActiveTasks());
- assertEquals(ASSIGNED_STANDBY_TASKS, member.assignedStandbyTasks());
- assertEquals(ASSIGNED_WARMUP_TASKS, member.assignedWarmupTasks());
- assertEquals(ACTIVE_TASKS_PENDING_REVOCATION,
member.activeTasksPendingRevocation());
- assertEquals(STANDBY_TASKS_PENDING_REVOCATION,
member.standbyTasksPendingRevocation());
- assertEquals(WARMUP_TASKS_PENDING_REVOCATION,
member.warmupTasksPendingRevocation());
+ assertEquals(ASSIGNED_TASKS, member.assignedTasks());
+ assertEquals(TASKS_PENDING_REVOCATION,
member.tasksPendingRevocation());
}
@Test
@@ -179,12 +177,8 @@ public class StreamsGroupMemberTest {
assertNull(member.memberEpoch());
assertNull(member.previousMemberEpoch());
assertNull(member.state());
- assertNull(member.assignedActiveTasks());
- assertNull(member.assignedStandbyTasks());
- assertNull(member.assignedWarmupTasks());
- assertNull(member.activeTasksPendingRevocation());
- assertNull(member.standbyTasksPendingRevocation());
- assertNull(member.warmupTasksPendingRevocation());
+ assertNull(member.assignedTasks());
+ assertNull(member.tasksPendingRevocation());
}
@Test
@@ -208,12 +202,8 @@ public class StreamsGroupMemberTest {
assertEquals(record.memberEpoch(), member.memberEpoch());
assertEquals(record.previousMemberEpoch(),
member.previousMemberEpoch());
assertEquals(MemberState.fromValue(record.state()), member.state());
- assertEquals(ASSIGNED_ACTIVE_TASKS, member.assignedActiveTasks());
- assertEquals(ASSIGNED_STANDBY_TASKS, member.assignedStandbyTasks());
- assertEquals(ASSIGNED_WARMUP_TASKS, member.assignedWarmupTasks());
- assertEquals(ACTIVE_TASKS_PENDING_REVOCATION,
member.activeTasksPendingRevocation());
- assertEquals(STANDBY_TASKS_PENDING_REVOCATION,
member.standbyTasksPendingRevocation());
- assertEquals(WARMUP_TASKS_PENDING_REVOCATION,
member.warmupTasksPendingRevocation());
+ assertEquals(ASSIGNED_TASKS, member.assignedTasks());
+ assertEquals(TASKS_PENDING_REVOCATION,
member.tasksPendingRevocation());
assertNull(member.instanceId());
assertNull(member.rackId());
assertNull(member.rebalanceTimeoutMs());
@@ -275,12 +265,8 @@ public class StreamsGroupMemberTest {
assertEquals(member.state(), updatedMember.state());
assertEquals(member.clientId(), updatedMember.clientId());
assertEquals(member.clientHost(), updatedMember.clientHost());
- assertEquals(member.assignedActiveTasks(),
updatedMember.assignedActiveTasks());
- assertEquals(member.assignedStandbyTasks(),
updatedMember.assignedStandbyTasks());
- assertEquals(member.assignedWarmupTasks(),
updatedMember.assignedWarmupTasks());
- assertEquals(member.activeTasksPendingRevocation(),
updatedMember.activeTasksPendingRevocation());
- assertEquals(member.standbyTasksPendingRevocation(),
updatedMember.standbyTasksPendingRevocation());
- assertEquals(member.warmupTasksPendingRevocation(),
updatedMember.warmupTasksPendingRevocation());
+ assertEquals(member.assignedTasks(), updatedMember.assignedTasks());
+ assertEquals(member.tasksPendingRevocation(),
updatedMember.tasksPendingRevocation());
}
@Test
@@ -306,25 +292,8 @@ public class StreamsGroupMemberTest {
assertEquals(member.processId(), updatedMember.processId());
assertEquals(member.userEndpoint(), updatedMember.userEndpoint());
assertEquals(member.clientTags(), updatedMember.clientTags());
- assertEquals(member.assignedActiveTasks(),
updatedMember.assignedActiveTasks());
- assertEquals(member.assignedStandbyTasks(),
updatedMember.assignedStandbyTasks());
- assertEquals(member.assignedWarmupTasks(),
updatedMember.assignedWarmupTasks());
- assertEquals(member.activeTasksPendingRevocation(),
updatedMember.activeTasksPendingRevocation());
- assertEquals(member.standbyTasksPendingRevocation(),
updatedMember.standbyTasksPendingRevocation());
- assertEquals(member.warmupTasksPendingRevocation(),
updatedMember.warmupTasksPendingRevocation());
- }
-
- @Test
- public void testReturnUnmodifiableFields() {
- final StreamsGroupMember member = createStreamsGroupMember();
-
- assertThrows(UnsupportedOperationException.class, () ->
member.clientTags().put("not allowed", ""));
- assertThrows(UnsupportedOperationException.class, () ->
member.assignedActiveTasks().put("not allowed", Collections.emptySet()));
- assertThrows(UnsupportedOperationException.class, () ->
member.assignedStandbyTasks().put("not allowed", Collections.emptySet()));
- assertThrows(UnsupportedOperationException.class, () ->
member.assignedWarmupTasks().put("not allowed", Collections.emptySet()));
- assertThrows(UnsupportedOperationException.class, () ->
member.activeTasksPendingRevocation().put("not allowed",
Collections.emptySet()));
- assertThrows(UnsupportedOperationException.class, () ->
member.standbyTasksPendingRevocation().put("not allowed",
Collections.emptySet()));
- assertThrows(UnsupportedOperationException.class, () ->
member.warmupTasksPendingRevocation().put("not allowed",
Collections.emptySet()));
+ assertEquals(member.assignedTasks(), updatedMember.assignedTasks());
+ assertEquals(member.tasksPendingRevocation(),
updatedMember.tasksPendingRevocation());
}
@Test
@@ -333,7 +302,7 @@ public class StreamsGroupMemberTest {
List<Integer> assignedTasks1 = Arrays.asList(10, 11, 12);
List<Integer> assignedTasks2 = Arrays.asList(13, 14, 15);
List<Integer> assignedTasks3 = Arrays.asList(16, 17, 18);
- Assignment targetAssignment = new Assignment(
+ TasksTuple targetAssignment = new TasksTuple(
mkMap(mkEntry(SUBTOPOLOGY1, new HashSet<>(assignedTasks3))),
mkMap(mkEntry(SUBTOPOLOGY2, new HashSet<>(assignedTasks2))),
mkMap(mkEntry(SUBTOPOLOGY3, new HashSet<>(assignedTasks1)))
@@ -404,6 +373,45 @@ public class StreamsGroupMemberTest {
assertEquals(new StreamsGroupDescribeResponseData.Assignment(),
streamsGroupDescribeMember.targetAssignment());
}
+ @Test
+ public void testHasAssignedTasksChanged() {
+ StreamsGroupMember member1 = new StreamsGroupMember.Builder(MEMBER_ID)
+ .setAssignedTasks(new TasksTuple(
+ mkMap(mkEntry(SUBTOPOLOGY1, new HashSet<>(TASKS1))),
+ mkMap(mkEntry(SUBTOPOLOGY2, new HashSet<>(TASKS2))),
+ mkMap(mkEntry(SUBTOPOLOGY1, new HashSet<>(TASKS3)))
+ ))
+ .build();
+
+ StreamsGroupMember member2 = new StreamsGroupMember.Builder(MEMBER_ID)
+ .setAssignedTasks(new TasksTuple(
+ mkMap(mkEntry(SUBTOPOLOGY1, new HashSet<>(TASKS4))),
+ mkMap(mkEntry(SUBTOPOLOGY2, new HashSet<>(TASKS5))),
+ mkMap(mkEntry(SUBTOPOLOGY1, new HashSet<>(TASKS6)))
+ ))
+ .build();
+
+ assertTrue(StreamsGroupMember.hasAssignedTasksChanged(member1,
member2));
+
+ StreamsGroupMember member3 = new StreamsGroupMember.Builder(MEMBER_ID)
+ .setAssignedTasks(new TasksTuple(
+ mkMap(mkEntry(SUBTOPOLOGY1, new HashSet<>(TASKS1))),
+ mkMap(mkEntry(SUBTOPOLOGY2, new HashSet<>(TASKS2))),
+ mkMap(mkEntry(SUBTOPOLOGY1, new HashSet<>(TASKS3)))
+ ))
+ .build();
+
+ StreamsGroupMember member4 = new StreamsGroupMember.Builder(MEMBER_ID)
+ .setAssignedTasks(new TasksTuple(
+ mkMap(mkEntry(SUBTOPOLOGY1, new HashSet<>(TASKS1))),
+ mkMap(mkEntry(SUBTOPOLOGY2, new HashSet<>(TASKS2))),
+ mkMap(mkEntry(SUBTOPOLOGY1, new HashSet<>(TASKS3)))
+ ))
+ .build();
+
+ assertFalse(StreamsGroupMember.hasAssignedTasksChanged(member3,
member4));
+ }
+
private StreamsGroupMember createStreamsGroupMember() {
return new StreamsGroupMember.Builder(MEMBER_ID)
.setMemberEpoch(MEMBER_EPOCH)
@@ -418,12 +426,8 @@ public class StreamsGroupMemberTest {
.setProcessId(PROCESS_ID)
.setUserEndpoint(USER_ENDPOINT)
.setClientTags(CLIENT_TAGS)
- .setAssignedActiveTasks(ASSIGNED_ACTIVE_TASKS)
- .setAssignedStandbyTasks(ASSIGNED_STANDBY_TASKS)
- .setAssignedWarmupTasks(ASSIGNED_WARMUP_TASKS)
- .setActiveTasksPendingRevocation(ACTIVE_TASKS_PENDING_REVOCATION)
- .setStandbyTasksPendingRevocation(STANDBY_TASKS_PENDING_REVOCATION)
- .setWarmupTasksPendingRevocation(WARMUP_TASKS_PENDING_REVOCATION)
+ .setAssignedTasks(ASSIGNED_TASKS)
+ .setTasksPendingRevocation(TASKS_PENDING_REVOCATION)
.build();
}
}
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/TaskAssignmentTestUtil.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/TaskAssignmentTestUtil.java
index 47668ec84c0..f633fec80f7 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/TaskAssignmentTestUtil.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/TaskAssignmentTestUtil.java
@@ -17,24 +17,27 @@
package org.apache.kafka.coordinator.group.streams;
import java.util.AbstractMap;
-import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
-import java.util.Objects;
import java.util.Set;
public class TaskAssignmentTestUtil {
- public static Assignment mkAssignment(final Map<String, Set<Integer>>
activeTasks,
- final Map<String, Set<Integer>>
standbyTasks,
- final Map<String, Set<Integer>>
warmupTasks) {
- return new Assignment(
- Collections.unmodifiableMap(Objects.requireNonNull(activeTasks)),
- Collections.unmodifiableMap(Objects.requireNonNull(standbyTasks)),
- Collections.unmodifiableMap(Objects.requireNonNull(warmupTasks))
- );
+ public enum TaskRole {
+ ACTIVE,
+ STANDBY,
+ WARMUP
+ }
+
+ @SafeVarargs
+ public static TasksTuple mkTasksTuple(TaskRole taskRole, Map.Entry<String,
Set<Integer>>... entries) {
+ return switch (taskRole) {
+ case ACTIVE -> new TasksTuple(mkTasksPerSubtopology(entries), new
HashMap<>(), new HashMap<>());
+ case STANDBY -> new TasksTuple(new HashMap<>(),
mkTasksPerSubtopology(entries), new HashMap<>());
+ case WARMUP -> new TasksTuple(new HashMap<>(), new HashMap<>(),
mkTasksPerSubtopology(entries));
+ };
}
public static Map.Entry<String, Set<Integer>> mkTasks(String subtopologyId,
@@ -46,8 +49,7 @@ public class TaskAssignmentTestUtil {
}
@SafeVarargs
- public static Map<String, Set<Integer>>
mkTasksPerSubtopology(Map.Entry<String,
-
Set<Integer>>... entries) {
+ public static Map<String, Set<Integer>>
mkTasksPerSubtopology(Map.Entry<String, Set<Integer>>... entries) {
Map<String, Set<Integer>> assignment = new HashMap<>();
for (Map.Entry<String, Set<Integer>> entry : entries) {
assignment.put(entry.getKey(), entry.getValue());
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/AssignmentTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/TasksTupleTest.java
similarity index 57%
rename from
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/AssignmentTest.java
rename to
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/TasksTupleTest.java
index 7c0baf27364..73c43a6d088 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/AssignmentTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/TasksTupleTest.java
@@ -30,19 +30,21 @@ import java.util.Set;
import static
org.apache.kafka.coordinator.group.streams.TaskAssignmentTestUtil.mkTasks;
import static
org.apache.kafka.coordinator.group.streams.TaskAssignmentTestUtil.mkTasksPerSubtopology;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
-public class AssignmentTest {
+public class TasksTupleTest {
- static final String SUBTOPOLOGY_1 = "subtopology1";
- static final String SUBTOPOLOGY_2 = "subtopology2";
- static final String SUBTOPOLOGY_3 = "subtopology3";
+ private static final String SUBTOPOLOGY_1 = "subtopology1";
+ private static final String SUBTOPOLOGY_2 = "subtopology2";
+ private static final String SUBTOPOLOGY_3 = "subtopology3";
@Test
public void testTasksCannotBeNull() {
- assertThrows(NullPointerException.class, () -> new Assignment(null,
Collections.emptyMap(), Collections.emptyMap()));
- assertThrows(NullPointerException.class, () -> new
Assignment(Collections.emptyMap(), null, Collections.emptyMap()));
- assertThrows(NullPointerException.class, () -> new
Assignment(Collections.emptyMap(), Collections.emptyMap(), null));
+ assertThrows(NullPointerException.class, () -> new TasksTuple(null,
Collections.emptyMap(), Collections.emptyMap()));
+ assertThrows(NullPointerException.class, () -> new
TasksTuple(Collections.emptyMap(), null, Collections.emptyMap()));
+ assertThrows(NullPointerException.class, () -> new
TasksTuple(Collections.emptyMap(), Collections.emptyMap(), null));
}
@Test
@@ -56,14 +58,14 @@ public class AssignmentTest {
Map<String, Set<Integer>> warmupTasks = mkTasksPerSubtopology(
mkTasks(SUBTOPOLOGY_3, 4, 5, 6)
);
- Assignment assignment = new Assignment(activeTasks, standbyTasks,
warmupTasks);
-
- assertEquals(activeTasks, assignment.activeTasks());
- assertThrows(UnsupportedOperationException.class, () ->
assignment.activeTasks().put("not allowed", Collections.emptySet()));
- assertEquals(standbyTasks, assignment.standbyTasks());
- assertThrows(UnsupportedOperationException.class, () ->
assignment.standbyTasks().put("not allowed", Collections.emptySet()));
- assertEquals(warmupTasks, assignment.warmupTasks());
- assertThrows(UnsupportedOperationException.class, () ->
assignment.warmupTasks().put("not allowed", Collections.emptySet()));
+ TasksTuple tuple = new TasksTuple(activeTasks, standbyTasks,
warmupTasks);
+
+ assertEquals(activeTasks, tuple.activeTasks());
+ assertThrows(UnsupportedOperationException.class, () ->
tuple.activeTasks().put("not allowed", Collections.emptySet()));
+ assertEquals(standbyTasks, tuple.standbyTasks());
+ assertThrows(UnsupportedOperationException.class, () ->
tuple.standbyTasks().put("not allowed", Collections.emptySet()));
+ assertEquals(warmupTasks, tuple.warmupTasks());
+ assertThrows(UnsupportedOperationException.class, () ->
tuple.warmupTasks().put("not allowed", Collections.emptySet()));
}
@Test
@@ -95,28 +97,83 @@ public class AssignmentTest {
.setStandbyTasks(standbyTasks)
.setWarmupTasks(warmupTasks);
- Assignment assignment = Assignment.fromRecord(record);
+ TasksTuple tuple = TasksTuple.fromTargetAssignmentRecord(record);
assertEquals(
mkTasksPerSubtopology(
mkTasks(SUBTOPOLOGY_1, 1, 2, 3),
mkTasks(SUBTOPOLOGY_2, 4, 5, 6)
),
- assignment.activeTasks()
+ tuple.activeTasks()
);
assertEquals(
mkTasksPerSubtopology(
mkTasks(SUBTOPOLOGY_1, 7, 8, 9),
mkTasks(SUBTOPOLOGY_2, 1, 2, 3)
),
- assignment.standbyTasks()
+ tuple.standbyTasks()
);
assertEquals(
mkTasksPerSubtopology(
mkTasks(SUBTOPOLOGY_1, 4, 5, 6),
mkTasks(SUBTOPOLOGY_2, 7, 8, 9)
),
- assignment.warmupTasks()
+ tuple.warmupTasks()
+ );
+ }
+
+ @Test
+ public void testMerge() {
+ TasksTuple tuple1 = new TasksTuple(
+ Map.of(SUBTOPOLOGY_1, Set.of(1, 2, 3)),
+ Map.of(SUBTOPOLOGY_2, Set.of(4, 5, 6)),
+ Map.of(SUBTOPOLOGY_3, Set.of(7, 8, 9))
+ );
+
+ TasksTuple tuple2 = new TasksTuple(
+ Map.of(SUBTOPOLOGY_1, Set.of(10, 11)),
+ Map.of(SUBTOPOLOGY_2, Set.of(12, 13)),
+ Map.of(SUBTOPOLOGY_3, Set.of(14, 15))
);
+
+ TasksTuple mergedTuple = tuple1.merge(tuple2);
+
+ assertEquals(Map.of(SUBTOPOLOGY_1, Set.of(1, 2, 3, 10, 11)),
mergedTuple.activeTasks());
+ assertEquals(Map.of(SUBTOPOLOGY_2, Set.of(4, 5, 6, 12, 13)),
mergedTuple.standbyTasks());
+ assertEquals(Map.of(SUBTOPOLOGY_3, Set.of(7, 8, 9, 14, 15)),
mergedTuple.warmupTasks());
+ }
+
+ @Test
+ public void testContainsAny() {
+ TasksTuple tuple1 = new TasksTuple(
+ Map.of(SUBTOPOLOGY_1, Set.of(1, 2, 3)),
+ Map.of(SUBTOPOLOGY_2, Set.of(4, 5, 6)),
+ Map.of(SUBTOPOLOGY_3, Set.of(7, 8, 9))
+ );
+
+ TasksTuple tuple2 = new TasksTuple(
+ Map.of(SUBTOPOLOGY_1, Set.of(3, 10, 11)),
+ Map.of(SUBTOPOLOGY_2, Set.of(12, 13)),
+ Map.of(SUBTOPOLOGY_3, Set.of(14, 15))
+ );
+
+ assertTrue(tuple1.containsAny(tuple2));
+
+ TasksTuple tuple3 = new TasksTuple(
+ Map.of(SUBTOPOLOGY_1, Set.of(10, 11)),
+ Map.of(SUBTOPOLOGY_2, Set.of(12, 13)),
+ Map.of(SUBTOPOLOGY_3, Set.of(14, 15))
+ );
+
+ assertFalse(tuple1.containsAny(tuple3));
+ }
+
+ @Test
+ public void testIsEmpty() {
+ TasksTuple emptyTuple = new TasksTuple(Map.of(), Map.of(), Map.of());
+ assertTrue(emptyTuple.isEmpty());
+
+ TasksTuple nonEmptyTuple = new TasksTuple(Map.of(SUBTOPOLOGY_1,
Set.of(1)), Map.of(), Map.of());
+ assertFalse(nonEmptyTuple.isEmpty());
}
}