This is an automated email from the ASF dual-hosted git repository.

lucasbru pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new a88fd01e744 KAFKA-19478 [1/N]: Precompute values in ProcessState  
(#20120)
a88fd01e744 is described below

commit a88fd01e7441d22df6255a494de01bd64ce624e6
Author: Lucas Brutschy <lbruts...@confluent.io>
AuthorDate: Tue Jul 8 13:32:47 2025 +0200

    KAFKA-19478 [1/N]: Precompute values in ProcessState  (#20120)
    
    This is a very mechanical and obvious change that is making most
    accessors in ProcessState constant time O(1), instead of linear time
    O(n), by computing the collections and aggregations at insertion time,
    instead of every time the value is accessed.
    
    Since the accessors are used in deeply nested loops, this reduces the
    runtime of our worst case benchmarks by ~14x.
    
    Reviewers: Bill Bejeck <bbej...@apache.org>
---
 .../group/streams/assignor/ProcessState.java       | 37 ++++++++++------------
 .../group/streams/assignor/StickyTaskAssignor.java |  2 +-
 2 files changed, 17 insertions(+), 22 deletions(-)

diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/ProcessState.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/ProcessState.java
index ff68269e561..d4dd2d4ba49 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/ProcessState.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/ProcessState.java
@@ -22,9 +22,6 @@ import java.util.Map;
 import java.util.Set;
 import java.util.stream.Collectors;
 
-import static java.util.Collections.unmodifiableSet;
-import static org.apache.kafka.common.utils.Utils.union;
-
 /**
  * Represents the state of a process in the group coordinator.
  * This includes the capacity of the process, the load on the process, and the 
tasks assigned to the process.
@@ -34,15 +31,18 @@ public class ProcessState {
     // number of members
     private int capacity;
     private double load;
+    private int taskCount;
+    private int activeTaskCount;
     private final Map<String, Integer> memberToTaskCounts;
     private final Map<String, Set<TaskId>> assignedActiveTasks;
     private final Map<String, Set<TaskId>> assignedStandbyTasks;
-
+    private final Set<TaskId> assignedTasks;
 
     ProcessState(final String processId) {
         this.processId = processId;
         this.capacity = 0;
         this.load = Double.MAX_VALUE;
+        this.assignedTasks = new HashSet<>();
         this.assignedActiveTasks = new HashMap<>();
         this.assignedStandbyTasks = new HashMap<>();
         this.memberToTaskCounts = new HashMap<>();
@@ -57,10 +57,6 @@ public class ProcessState {
         return capacity;
     }
 
-    public int totalTaskCount() {
-        return assignedStandbyTasks().size() + assignedActiveTasks().size();
-    }
-
     public double load() {
         return load;
     }
@@ -69,6 +65,10 @@ public class ProcessState {
         return memberToTaskCounts;
     }
 
+    public int activeTaskCount() {
+        return activeTaskCount;
+    }
+
     public Set<TaskId> assignedActiveTasks() {
         return assignedActiveTasks.values().stream()
             .flatMap(Set::stream)
@@ -90,7 +90,10 @@ public class ProcessState {
     }
 
     public void addTask(final String memberId, final TaskId taskId, final 
boolean isActive) {
+        taskCount += 1;
+        assignedTasks.add(taskId);
         if (isActive) {
+            activeTaskCount += 1;
             assignedActiveTasks.putIfAbsent(memberId, new HashSet<>());
             assignedActiveTasks.get(memberId).add(taskId);
         } else {
@@ -110,7 +113,7 @@ public class ProcessState {
         if (capacity <= 0) {
             this.load = -1;
         } else {
-            this.load = (double) totalTaskCount() / capacity;
+            this.load = (double) taskCount / capacity;
         }
     }
 
@@ -120,7 +123,7 @@ public class ProcessState {
     }
 
     public boolean hasCapacity() {
-        return totalTaskCount() < capacity;
+        return this.load < 1.0;
     }
 
     public int compareTo(final ProcessState other) {
@@ -132,18 +135,10 @@ public class ProcessState {
     }
 
     public boolean hasTask(final TaskId taskId) {
-        return assignedActiveTasks().contains(taskId) || 
assignedStandbyTasks().contains(taskId);    }
-
+        return assignedTasks.contains(taskId);
+    }
 
     Set<TaskId> assignedTasks() {
-        final Set<TaskId> assignedActiveTaskIds = assignedActiveTasks();
-        final Set<TaskId> assignedStandbyTaskIds = assignedStandbyTasks();
-        return unmodifiableSet(
-            union(
-                () -> new HashSet<>(assignedActiveTaskIds.size() + 
assignedStandbyTaskIds.size()),
-                assignedActiveTaskIds,
-                assignedStandbyTaskIds
-            )
-        );
+        return assignedTasks;
     }
 }
\ No newline at end of file
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/StickyTaskAssignor.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/StickyTaskAssignor.java
index c8d3c97d504..b1f1d9b1a11 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/StickyTaskAssignor.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/StickyTaskAssignor.java
@@ -329,7 +329,7 @@ public class StickyTaskAssignor implements TaskAssignor {
 
         if (isActive) {
             // update task per process
-            
maybeUpdateTasksPerMember(localState.processIdToState.get(member.processId).assignedActiveTasks().size());
+            
maybeUpdateTasksPerMember(localState.processIdToState.get(member.processId).activeTaskCount());
         }
     }
 

Reply via email to