This is an automated email from the ASF dual-hosted git repository. lucasbru pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new a88fd01e744 KAFKA-19478 [1/N]: Precompute values in ProcessState (#20120) a88fd01e744 is described below commit a88fd01e7441d22df6255a494de01bd64ce624e6 Author: Lucas Brutschy <lbruts...@confluent.io> AuthorDate: Tue Jul 8 13:32:47 2025 +0200 KAFKA-19478 [1/N]: Precompute values in ProcessState (#20120) This is a very mechanical and obvious change that is making most accessors in ProcessState constant time O(1), instead of linear time O(n), by computing the collections and aggregations at insertion time, instead of every time the value is accessed. Since the accessors are used in deeply nested loops, this reduces the runtime of our worst case benchmarks by ~14x. Reviewers: Bill Bejeck <bbej...@apache.org> --- .../group/streams/assignor/ProcessState.java | 37 ++++++++++------------ .../group/streams/assignor/StickyTaskAssignor.java | 2 +- 2 files changed, 17 insertions(+), 22 deletions(-) diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/ProcessState.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/ProcessState.java index ff68269e561..d4dd2d4ba49 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/ProcessState.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/ProcessState.java @@ -22,9 +22,6 @@ import java.util.Map; import java.util.Set; import java.util.stream.Collectors; -import static java.util.Collections.unmodifiableSet; -import static org.apache.kafka.common.utils.Utils.union; - /** * Represents the state of a process in the group coordinator. * This includes the capacity of the process, the load on the process, and the tasks assigned to the process. @@ -34,15 +31,18 @@ public class ProcessState { // number of members private int capacity; private double load; + private int taskCount; + private int activeTaskCount; private final Map<String, Integer> memberToTaskCounts; private final Map<String, Set<TaskId>> assignedActiveTasks; private final Map<String, Set<TaskId>> assignedStandbyTasks; - + private final Set<TaskId> assignedTasks; ProcessState(final String processId) { this.processId = processId; this.capacity = 0; this.load = Double.MAX_VALUE; + this.assignedTasks = new HashSet<>(); this.assignedActiveTasks = new HashMap<>(); this.assignedStandbyTasks = new HashMap<>(); this.memberToTaskCounts = new HashMap<>(); @@ -57,10 +57,6 @@ public class ProcessState { return capacity; } - public int totalTaskCount() { - return assignedStandbyTasks().size() + assignedActiveTasks().size(); - } - public double load() { return load; } @@ -69,6 +65,10 @@ public class ProcessState { return memberToTaskCounts; } + public int activeTaskCount() { + return activeTaskCount; + } + public Set<TaskId> assignedActiveTasks() { return assignedActiveTasks.values().stream() .flatMap(Set::stream) @@ -90,7 +90,10 @@ public class ProcessState { } public void addTask(final String memberId, final TaskId taskId, final boolean isActive) { + taskCount += 1; + assignedTasks.add(taskId); if (isActive) { + activeTaskCount += 1; assignedActiveTasks.putIfAbsent(memberId, new HashSet<>()); assignedActiveTasks.get(memberId).add(taskId); } else { @@ -110,7 +113,7 @@ public class ProcessState { if (capacity <= 0) { this.load = -1; } else { - this.load = (double) totalTaskCount() / capacity; + this.load = (double) taskCount / capacity; } } @@ -120,7 +123,7 @@ public class ProcessState { } public boolean hasCapacity() { - return totalTaskCount() < capacity; + return this.load < 1.0; } public int compareTo(final ProcessState other) { @@ -132,18 +135,10 @@ public class ProcessState { } public boolean hasTask(final TaskId taskId) { - return assignedActiveTasks().contains(taskId) || assignedStandbyTasks().contains(taskId); } - + return assignedTasks.contains(taskId); + } Set<TaskId> assignedTasks() { - final Set<TaskId> assignedActiveTaskIds = assignedActiveTasks(); - final Set<TaskId> assignedStandbyTaskIds = assignedStandbyTasks(); - return unmodifiableSet( - union( - () -> new HashSet<>(assignedActiveTaskIds.size() + assignedStandbyTaskIds.size()), - assignedActiveTaskIds, - assignedStandbyTaskIds - ) - ); + return assignedTasks; } } \ No newline at end of file diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/StickyTaskAssignor.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/StickyTaskAssignor.java index c8d3c97d504..b1f1d9b1a11 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/StickyTaskAssignor.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/StickyTaskAssignor.java @@ -329,7 +329,7 @@ public class StickyTaskAssignor implements TaskAssignor { if (isActive) { // update task per process - maybeUpdateTasksPerMember(localState.processIdToState.get(member.processId).assignedActiveTasks().size()); + maybeUpdateTasksPerMember(localState.processIdToState.get(member.processId).activeTaskCount()); } }