This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new af934aa2495 MINOR: Rename CoordinatorRuntime.activeTopicPartitions to 
activeCoordinators (#21010)
af934aa2495 is described below

commit af934aa2495a0832275e9d5a8b84fe263fa00027
Author: David Jacot <[email protected]>
AuthorDate: Fri Nov 28 09:16:34 2025 +0100

    MINOR: Rename CoordinatorRuntime.activeTopicPartitions to 
activeCoordinators (#21010)
    
    The coordinator runtime uses the term coordinator in all APIs so
    `CoordinatorRuntime.activeTopicPartitions` feels a bit weird. This patch
    renames it to `CoordinatorRuntime.activeCoordinators` to follow the
    convention. It also makes the `state` field within the context volatile
    as it is accesses without the log in that method.
    
    Reviewers: Chia-Ping Tsai <[email protected]>, Lianet Magrans 
<[email protected]>
---
 .../kafka/coordinator/common/runtime/CoordinatorRuntime.java  | 11 +++--------
 .../kafka/coordinator/share/ShareCoordinatorService.java      |  2 +-
 .../kafka/coordinator/share/ShareCoordinatorServiceTest.java  |  4 ++--
 3 files changed, 6 insertions(+), 11 deletions(-)

diff --git 
a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java
 
b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java
index 07e22f91e63..3504b55ca9d 100644
--- 
a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java
+++ 
b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java
@@ -578,7 +578,7 @@ public class CoordinatorRuntime<S extends 
CoordinatorShard<U>, U> implements Aut
         /**
          * The current state.
          */
-        CoordinatorState state;
+        volatile CoordinatorState state;
 
         /**
          * The current epoch of the coordinator. This represents
@@ -2611,14 +2611,9 @@ public class CoordinatorRuntime<S extends 
CoordinatorShard<U>, U> implements Aut
     }
 
     /**
-     * Util method which returns all the topic partitions for which
-     * the state machine is in active state.
-     * <p>
-     * This could be useful if the caller does not have a specific
-     * target internal topic partition.
-     * @return List of {@link TopicPartition} whose coordinators are active
+     * @return List of {@link TopicPartition} whose coordinators are active.
      */
-    public List<TopicPartition> activeTopicPartitions() {
+    public List<TopicPartition> activeCoordinators() {
         if (coordinators == null || coordinators.isEmpty()) {
             return List.of();
         }
diff --git 
a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java
 
b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java
index d0386e32b9f..ad6e7d37ab0 100644
--- 
a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java
+++ 
b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java
@@ -301,7 +301,7 @@ public class ShareCoordinatorService implements 
ShareCoordinator {
                     return;
                 }
                 List<CompletableFuture<Void>> futures = new ArrayList<>();
-                runtime.activeTopicPartitions().forEach(tp -> 
futures.add(performRecordPruning(tp)));
+                runtime.activeCoordinators().forEach(tp -> 
futures.add(performRecordPruning(tp)));
 
                 CompletableFuture.allOf(futures.toArray(new 
CompletableFuture<?>[]{}))
                     .whenComplete((res, exp) -> {
diff --git 
a/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorServiceTest.java
 
b/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorServiceTest.java
index 75049cfb039..2c3ef63af6b 100644
--- 
a/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorServiceTest.java
+++ 
b/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorServiceTest.java
@@ -93,7 +93,7 @@ class ShareCoordinatorServiceTest {
     @SuppressWarnings("unchecked")
     private CoordinatorRuntime<ShareCoordinatorShard, CoordinatorRecord> 
mockRuntime() {
         CoordinatorRuntime<ShareCoordinatorShard, CoordinatorRecord> runtime = 
mock(CoordinatorRuntime.class);
-        when(runtime.activeTopicPartitions())
+        when(runtime.activeCoordinators())
             .thenReturn(List.of(new 
TopicPartition(Topic.SHARE_GROUP_STATE_TOPIC_NAME, 0)));
         return runtime;
     }
@@ -1533,7 +1533,7 @@ class ShareCoordinatorServiceTest {
         TopicPartition tp1 = new 
TopicPartition(Topic.SHARE_GROUP_STATE_TOPIC_NAME, 0);
         TopicPartition tp2 = new 
TopicPartition(Topic.SHARE_GROUP_STATE_TOPIC_NAME, 1);
 
-        when(runtime.activeTopicPartitions())
+        when(runtime.activeCoordinators())
             .thenReturn(List.of(tp1, tp2));
 
         when(writer.deleteRecords(

Reply via email to