This is an automated email from the ASF dual-hosted git repository.
dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new af934aa2495 MINOR: Rename CoordinatorRuntime.activeTopicPartitions to
activeCoordinators (#21010)
af934aa2495 is described below
commit af934aa2495a0832275e9d5a8b84fe263fa00027
Author: David Jacot <[email protected]>
AuthorDate: Fri Nov 28 09:16:34 2025 +0100
MINOR: Rename CoordinatorRuntime.activeTopicPartitions to
activeCoordinators (#21010)
The coordinator runtime uses the term coordinator in all APIs so
`CoordinatorRuntime.activeTopicPartitions` feels a bit weird. This patch
renames it to `CoordinatorRuntime.activeCoordinators` to follow the
convention. It also makes the `state` field within the context volatile
as it is accesses without the log in that method.
Reviewers: Chia-Ping Tsai <[email protected]>, Lianet Magrans
<[email protected]>
---
.../kafka/coordinator/common/runtime/CoordinatorRuntime.java | 11 +++--------
.../kafka/coordinator/share/ShareCoordinatorService.java | 2 +-
.../kafka/coordinator/share/ShareCoordinatorServiceTest.java | 4 ++--
3 files changed, 6 insertions(+), 11 deletions(-)
diff --git
a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java
b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java
index 07e22f91e63..3504b55ca9d 100644
---
a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java
+++
b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java
@@ -578,7 +578,7 @@ public class CoordinatorRuntime<S extends
CoordinatorShard<U>, U> implements Aut
/**
* The current state.
*/
- CoordinatorState state;
+ volatile CoordinatorState state;
/**
* The current epoch of the coordinator. This represents
@@ -2611,14 +2611,9 @@ public class CoordinatorRuntime<S extends
CoordinatorShard<U>, U> implements Aut
}
/**
- * Util method which returns all the topic partitions for which
- * the state machine is in active state.
- * <p>
- * This could be useful if the caller does not have a specific
- * target internal topic partition.
- * @return List of {@link TopicPartition} whose coordinators are active
+ * @return List of {@link TopicPartition} whose coordinators are active.
*/
- public List<TopicPartition> activeTopicPartitions() {
+ public List<TopicPartition> activeCoordinators() {
if (coordinators == null || coordinators.isEmpty()) {
return List.of();
}
diff --git
a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java
b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java
index d0386e32b9f..ad6e7d37ab0 100644
---
a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java
+++
b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java
@@ -301,7 +301,7 @@ public class ShareCoordinatorService implements
ShareCoordinator {
return;
}
List<CompletableFuture<Void>> futures = new ArrayList<>();
- runtime.activeTopicPartitions().forEach(tp ->
futures.add(performRecordPruning(tp)));
+ runtime.activeCoordinators().forEach(tp ->
futures.add(performRecordPruning(tp)));
CompletableFuture.allOf(futures.toArray(new
CompletableFuture<?>[]{}))
.whenComplete((res, exp) -> {
diff --git
a/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorServiceTest.java
b/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorServiceTest.java
index 75049cfb039..2c3ef63af6b 100644
---
a/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorServiceTest.java
+++
b/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorServiceTest.java
@@ -93,7 +93,7 @@ class ShareCoordinatorServiceTest {
@SuppressWarnings("unchecked")
private CoordinatorRuntime<ShareCoordinatorShard, CoordinatorRecord>
mockRuntime() {
CoordinatorRuntime<ShareCoordinatorShard, CoordinatorRecord> runtime =
mock(CoordinatorRuntime.class);
- when(runtime.activeTopicPartitions())
+ when(runtime.activeCoordinators())
.thenReturn(List.of(new
TopicPartition(Topic.SHARE_GROUP_STATE_TOPIC_NAME, 0)));
return runtime;
}
@@ -1533,7 +1533,7 @@ class ShareCoordinatorServiceTest {
TopicPartition tp1 = new
TopicPartition(Topic.SHARE_GROUP_STATE_TOPIC_NAME, 0);
TopicPartition tp2 = new
TopicPartition(Topic.SHARE_GROUP_STATE_TOPIC_NAME, 1);
- when(runtime.activeTopicPartitions())
+ when(runtime.activeCoordinators())
.thenReturn(List.of(tp1, tp2));
when(writer.deleteRecords(