This is an automated email from the ASF dual-hosted git repository.
lianetm pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new d27d90ccb3b MINOR: Refactor OffsetFetch path (#21009)
d27d90ccb3b is described below
commit d27d90ccb3b2b98e02de42afd50910fbbbc162d0
Author: David Jacot <[email protected]>
AuthorDate: Thu Nov 27 19:25:10 2025 +0100
MINOR: Refactor OffsetFetch path (#21009)
The `GroupCoordinator` interface has two methods to fetch offsets:
`fetchOffsets` and `fetchAllOffsets`. They have the exact same signature
and the implementation in `GroupCoordinatorService` is exactly the same,
modulo the name of the operation. The path refactors the path to
simplify it and reuse more code. We could further refactor `KafkaApis`
but let's do this in a follow-up in order to keep this change small and
simple.
Reviewers: Sean Quah <[email protected]>, Lianet Magrans
<[email protected]>
---
.../kafka/common/requests/OffsetFetchRequest.java | 4 ++
core/src/main/scala/kafka/server/KafkaApis.scala | 4 +-
.../scala/unit/kafka/server/KafkaApisTest.scala | 12 ++--
.../kafka/coordinator/group/GroupCoordinator.java | 15 -----
.../coordinator/group/GroupCoordinatorService.java | 67 ++--------------------
.../coordinator/group/GroupCoordinatorShard.java | 23 ++------
.../group/GroupCoordinatorServiceTest.java | 51 ++++++++--------
.../group/GroupCoordinatorShardTest.java | 48 ++++++++++++++++
8 files changed, 99 insertions(+), 125 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java
b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java
index 83b62b5c479..7f53e81b14e 100644
---
a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java
+++
b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java
@@ -325,6 +325,10 @@ public class OffsetFetchRequest extends AbstractRequest {
return version >= TOPIC_ID_MIN_VERSION;
}
+ public static boolean
requestAllOffsets(OffsetFetchRequestData.OffsetFetchRequestGroup request) {
+ return request.topics() == null;
+ }
+
@Override
public OffsetFetchRequestData data() {
return data;
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala
b/core/src/main/scala/kafka/server/KafkaApis.scala
index affdced8cf4..b7f2e3364f0 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -1014,7 +1014,7 @@ class KafkaApis(val requestChannel: RequestChannel,
val futures = new
mutable.ArrayBuffer[CompletableFuture[OffsetFetchResponseData.OffsetFetchResponseGroup]](groups.size)
groups.forEach { groupOffsetFetch =>
- val isAllPartitions = groupOffsetFetch.topics == null
+ val isAllPartitions =
OffsetFetchRequest.requestAllOffsets(groupOffsetFetch)
if (!authHelper.authorize(request.context, DESCRIBE, GROUP,
groupOffsetFetch.groupId)) {
futures +=
CompletableFuture.completedFuture(OffsetFetchResponse.groupError(
groupOffsetFetch,
@@ -1050,7 +1050,7 @@ class KafkaApis(val requestChannel: RequestChannel,
): CompletableFuture[OffsetFetchResponseData.OffsetFetchResponseGroup] = {
val useTopicIds = OffsetFetchRequest.useTopicIds(requestContext.apiVersion)
- groupCoordinator.fetchAllOffsets(
+ groupCoordinator.fetchOffsets(
requestContext,
groupFetchRequest,
requireStable
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index dc0dbe11bdd..4d4fbe5b710 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -9032,7 +9032,7 @@ class KafkaApisTest extends Logging {
)).thenReturn(group1Future)
val group2Future = new
CompletableFuture[OffsetFetchResponseData.OffsetFetchResponseGroup]()
- when(groupCoordinator.fetchAllOffsets(
+ when(groupCoordinator.fetchOffsets(
requestChannelRequest.context,
new OffsetFetchRequestData.OffsetFetchRequestGroup()
.setGroupId("group-2")
@@ -9041,7 +9041,7 @@ class KafkaApisTest extends Logging {
)).thenReturn(group2Future)
val group3Future = new
CompletableFuture[OffsetFetchResponseData.OffsetFetchResponseGroup]()
- when(groupCoordinator.fetchAllOffsets(
+ when(groupCoordinator.fetchOffsets(
requestChannelRequest.context,
new OffsetFetchRequestData.OffsetFetchRequestGroup()
.setGroupId("group-3")
@@ -9050,7 +9050,7 @@ class KafkaApisTest extends Logging {
)).thenReturn(group3Future)
val group4Future = new
CompletableFuture[OffsetFetchResponseData.OffsetFetchResponseGroup]()
- when(groupCoordinator.fetchAllOffsets(
+ when(groupCoordinator.fetchOffsets(
requestChannelRequest.context,
new OffsetFetchRequestData.OffsetFetchRequestGroup()
.setGroupId("group-4")
@@ -9190,7 +9190,7 @@ class KafkaApisTest extends Logging {
)).thenReturn(group1Future)
val group2Future = new
CompletableFuture[OffsetFetchResponseData.OffsetFetchResponseGroup]()
- when(groupCoordinator.fetchAllOffsets(
+ when(groupCoordinator.fetchOffsets(
requestChannelRequest.context,
new OffsetFetchRequestData.OffsetFetchRequestGroup()
.setGroupId("group-2")
@@ -9384,7 +9384,7 @@ class KafkaApisTest extends Logging {
val requestChannelRequest = makeRequest(version)
val future = new
CompletableFuture[OffsetFetchResponseData.OffsetFetchResponseGroup]()
- when(groupCoordinator.fetchAllOffsets(
+ when(groupCoordinator.fetchOffsets(
requestChannelRequest.context,
new OffsetFetchRequestData.OffsetFetchRequestGroup()
.setGroupId("group-1")
@@ -9546,7 +9546,7 @@ class KafkaApisTest extends Logging {
// group-3 is allowed and bar is allowed.
val group3Future = new
CompletableFuture[OffsetFetchResponseData.OffsetFetchResponseGroup]()
- when(groupCoordinator.fetchAllOffsets(
+ when(groupCoordinator.fetchOffsets(
requestChannelRequest.context,
new OffsetFetchRequestData.OffsetFetchRequestGroup()
.setGroupId("group-3")
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java
index 83dc9622683..efdbb57a74a 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java
@@ -286,21 +286,6 @@ public interface GroupCoordinator {
boolean requireStable
);
- /**
- * Fetch all offsets for a given Group.
- *
- * @param context The request context.
- * @param request The OffsetFetchRequestGroup request.
- *
- * @return A future yielding the results.
- * The error codes of the results are set to indicate the errors
occurred during the execution.
- */
- CompletableFuture<OffsetFetchResponseData.OffsetFetchResponseGroup>
fetchAllOffsets(
- AuthorizableRequestContext context,
- OffsetFetchRequestData.OffsetFetchRequestGroup request,
- boolean requireStable
- );
-
/**
* Describe the Share Group Offsets for a given group.
*
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
index 2817eba1798..35797024c10 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
@@ -72,6 +72,7 @@ import
org.apache.kafka.common.requests.DeleteShareGroupOffsetsRequest;
import org.apache.kafka.common.requests.DescribeGroupsRequest;
import org.apache.kafka.common.requests.DescribeShareGroupOffsetsRequest;
import org.apache.kafka.common.requests.OffsetCommitRequest;
+import org.apache.kafka.common.requests.OffsetFetchRequest;
import org.apache.kafka.common.requests.OffsetFetchResponse;
import org.apache.kafka.common.requests.ShareGroupDescribeRequest;
import org.apache.kafka.common.requests.ShareGroupHeartbeatRequest;
@@ -1629,6 +1630,8 @@ public class GroupCoordinatorService implements
GroupCoordinator {
));
}
+ var name = OffsetFetchRequest.requestAllOffsets(request) ?
"fetch-all-offsets" : "fetch-offsets";
+
// The require stable flag when set tells the broker to hold on
returning unstable
// (or uncommitted) offsets. In the previous implementation of the
group coordinator,
// the UNSTABLE_OFFSET_COMMIT error is returned when unstable offsets
are present. As
@@ -1639,7 +1642,7 @@ public class GroupCoordinatorService implements
GroupCoordinator {
// the pending offsets are committed. Otherwise, we use a read
operation.
if (requireStable) {
return runtime.scheduleWriteOperation(
- "fetch-offsets",
+ name,
topicPartitionFor(request.groupId()),
Duration.ofMillis(config.offsetCommitTimeoutMs()),
coordinator -> new CoordinatorResult<>(
@@ -1647,78 +1650,20 @@ public class GroupCoordinatorService implements
GroupCoordinator {
coordinator.fetchOffsets(request, Long.MAX_VALUE)
)
).exceptionally(exception -> handleOffsetFetchException(
- "fetch-offsets",
+ name,
context,
request,
exception
));
} else {
return runtime.scheduleReadOperation(
- "fetch-offsets",
+ name,
topicPartitionFor(request.groupId()),
(coordinator, offset) -> coordinator.fetchOffsets(request,
offset)
);
}
}
- /**
- * See {@link GroupCoordinator#fetchAllOffsets(AuthorizableRequestContext,
OffsetFetchRequestData.OffsetFetchRequestGroup, boolean)}.
- */
- @Override
- public CompletableFuture<OffsetFetchResponseData.OffsetFetchResponseGroup>
fetchAllOffsets(
- AuthorizableRequestContext context,
- OffsetFetchRequestData.OffsetFetchRequestGroup request,
- boolean requireStable
- ) {
- if (!isActive.get()) {
- return
CompletableFuture.completedFuture(OffsetFetchResponse.groupError(
- request,
- Errors.COORDINATOR_NOT_AVAILABLE,
- context.requestVersion()
- ));
- }
-
- // For backwards compatibility, we support fetch commits for the empty
group id.
- if (request.groupId() == null) {
- return
CompletableFuture.completedFuture(OffsetFetchResponse.groupError(
- request,
- Errors.INVALID_GROUP_ID,
- context.requestVersion()
- ));
- }
-
- // The require stable flag when set tells the broker to hold on
returning unstable
- // (or uncommitted) offsets. In the previous implementation of the
group coordinator,
- // the UNSTABLE_OFFSET_COMMIT error is returned when unstable offsets
are present. As
- // the new implementation relies on timeline data structures, the
coordinator does not
- // really know whether offsets are stable or not so it is hard to
return the same error.
- // Instead, we use a write operation when the flag is set to guarantee
that the fetch
- // is based on all the available offsets and to ensure that the
response waits until
- // the pending offsets are committed. Otherwise, we use a read
operation.
- if (requireStable) {
- return runtime.scheduleWriteOperation(
- "fetch-all-offsets",
- topicPartitionFor(request.groupId()),
- Duration.ofMillis(config.offsetCommitTimeoutMs()),
- coordinator -> new CoordinatorResult<>(
- List.of(),
- coordinator.fetchAllOffsets(request, Long.MAX_VALUE)
- )
- ).exceptionally(exception -> handleOffsetFetchException(
- "fetch-all-offsets",
- context,
- request,
- exception
- ));
- } else {
- return runtime.scheduleReadOperation(
- "fetch-all-offsets",
- topicPartitionFor(request.groupId()),
- (coordinator, offset) -> coordinator.fetchAllOffsets(request,
offset)
- );
- }
- }
-
/**
* See {@link
GroupCoordinator#describeShareGroupOffsets(AuthorizableRequestContext,
DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestGroup)}.
*/
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
index d4c1ba1d3cd..beb37b9aba0 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
@@ -57,6 +57,7 @@ import
org.apache.kafka.common.message.TxnOffsetCommitRequestData;
import org.apache.kafka.common.message.TxnOffsetCommitResponseData;
import org.apache.kafka.common.protocol.ApiMessage;
import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.OffsetFetchRequest;
import org.apache.kafka.common.requests.TransactionResult;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
@@ -806,23 +807,11 @@ public class GroupCoordinatorShard implements
CoordinatorShard<CoordinatorRecord
OffsetFetchRequestData.OffsetFetchRequestGroup request,
long epoch
) throws ApiException {
- return offsetMetadataManager.fetchOffsets(request, epoch);
- }
-
- /**
- * Fetch all offsets for a given group.
- *
- * @param request The OffsetFetchRequestGroup request.
- * @param epoch The epoch (or offset) used to read from the
- * timeline data structure.
- *
- * @return A List of OffsetFetchResponseTopics response.
- */
- public OffsetFetchResponseData.OffsetFetchResponseGroup fetchAllOffsets(
- OffsetFetchRequestData.OffsetFetchRequestGroup request,
- long epoch
- ) throws ApiException {
- return offsetMetadataManager.fetchAllOffsets(request, epoch);
+ if (OffsetFetchRequest.requestAllOffsets(request)) {
+ return offsetMetadataManager.fetchAllOffsets(request, epoch);
+ } else {
+ return offsetMetadataManager.fetchOffsets(request, epoch);
+ }
}
/**
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
index 75e0bc45c76..6dff3e701b9 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
@@ -1722,11 +1722,14 @@ public class GroupCoordinatorServiceTest {
OffsetFetchRequestData.OffsetFetchRequestGroup request =
new OffsetFetchRequestData.OffsetFetchRequestGroup()
.setGroupId("group");
- if (!fetchAllOffsets) {
- request
- .setTopics(List.of(new
OffsetFetchRequestData.OffsetFetchRequestTopics()
- .setName("foo")
- .setPartitionIndexes(List.of(0))));
+
+ if (fetchAllOffsets) {
+ request.setTopics(null);
+ } else {
+ request.setTopics(List.of(new
OffsetFetchRequestData.OffsetFetchRequestTopics()
+ .setName("foo")
+ .setPartitionIndexes(List.of(0))
+ ));
}
OffsetFetchResponseData.OffsetFetchResponseGroup response =
@@ -1753,9 +1756,7 @@ public class GroupCoordinatorServiceTest {
)).thenReturn(CompletableFuture.completedFuture(response));
}
- TriFunction<RequestContext,
OffsetFetchRequestData.OffsetFetchRequestGroup, Boolean,
CompletableFuture<OffsetFetchResponseData.OffsetFetchResponseGroup>>
fetchOffsets =
- fetchAllOffsets ? service::fetchAllOffsets : service::fetchOffsets;
- CompletableFuture<OffsetFetchResponseData.OffsetFetchResponseGroup>
future = fetchOffsets.apply(
+ CompletableFuture<OffsetFetchResponseData.OffsetFetchResponseGroup>
future = service.fetchOffsets(
requestContext(ApiKeys.OFFSET_FETCH),
request,
requireStable
@@ -1784,16 +1785,17 @@ public class GroupCoordinatorServiceTest {
OffsetFetchRequestData.OffsetFetchRequestGroup request =
new OffsetFetchRequestData.OffsetFetchRequestGroup()
.setGroupId("group");
- if (!fetchAllOffsets) {
- request
- .setTopics(List.of(new
OffsetFetchRequestData.OffsetFetchRequestTopics()
- .setName("foo")
- .setPartitionIndexes(List.of(0))));
+
+ if (fetchAllOffsets) {
+ request.setTopics(null);
+ } else {
+ request.setTopics(List.of(new
OffsetFetchRequestData.OffsetFetchRequestTopics()
+ .setName("foo")
+ .setPartitionIndexes(List.of(0))
+ ));
}
- TriFunction<RequestContext,
OffsetFetchRequestData.OffsetFetchRequestGroup, Boolean,
CompletableFuture<OffsetFetchResponseData.OffsetFetchResponseGroup>>
fetchOffsets =
- fetchAllOffsets ? service::fetchAllOffsets : service::fetchOffsets;
- CompletableFuture<OffsetFetchResponseData.OffsetFetchResponseGroup>
future = fetchOffsets.apply(
+ CompletableFuture<OffsetFetchResponseData.OffsetFetchResponseGroup>
future = service.fetchOffsets(
requestContext(ApiKeys.OFFSET_FETCH),
request,
requireStable
@@ -1834,11 +1836,14 @@ public class GroupCoordinatorServiceTest {
OffsetFetchRequestData.OffsetFetchRequestGroup request =
new OffsetFetchRequestData.OffsetFetchRequestGroup()
.setGroupId("group");
- if (!fetchAllOffsets) {
- request
- .setTopics(List.of(new
OffsetFetchRequestData.OffsetFetchRequestTopics()
- .setName("foo")
- .setPartitionIndexes(List.of(0))));
+
+ if (fetchAllOffsets) {
+ request.setTopics(null);
+ } else {
+ request.setTopics(List.of(new
OffsetFetchRequestData.OffsetFetchRequestTopics()
+ .setName("foo")
+ .setPartitionIndexes(List.of(0))
+ ));
}
when(runtime.scheduleWriteOperation(
@@ -1848,9 +1853,7 @@ public class GroupCoordinatorServiceTest {
ArgumentMatchers.any()
)).thenReturn(FutureUtils.failedFuture(new
CompletionException(error.exception())));
- TriFunction<RequestContext,
OffsetFetchRequestData.OffsetFetchRequestGroup, Boolean,
CompletableFuture<OffsetFetchResponseData.OffsetFetchResponseGroup>>
fetchOffsets =
- fetchAllOffsets ? service::fetchAllOffsets : service::fetchOffsets;
- CompletableFuture<OffsetFetchResponseData.OffsetFetchResponseGroup>
future = fetchOffsets.apply(
+ CompletableFuture<OffsetFetchResponseData.OffsetFetchResponseGroup>
future = service.fetchOffsets(
requestContext(ApiKeys.OFFSET_FETCH),
request,
true
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java
index 4dc0c8ff78c..2a6330e0d66 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java
@@ -28,6 +28,8 @@ import
org.apache.kafka.common.message.DeleteShareGroupOffsetsResponseData;
import org.apache.kafka.common.message.DeleteShareGroupStateRequestData;
import org.apache.kafka.common.message.OffsetCommitRequestData;
import org.apache.kafka.common.message.OffsetCommitResponseData;
+import org.apache.kafka.common.message.OffsetFetchRequestData;
+import org.apache.kafka.common.message.OffsetFetchResponseData;
import org.apache.kafka.common.message.ShareGroupHeartbeatRequestData;
import org.apache.kafka.common.message.ShareGroupHeartbeatResponseData;
import org.apache.kafka.common.message.StreamsGroupHeartbeatRequestData;
@@ -99,6 +101,7 @@ import org.apache.kafka.timeline.SnapshotRegistry;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
+import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
@@ -2384,4 +2387,49 @@ public class GroupCoordinatorShardTest {
assertEquals(expectedResult,
coordinator.completeDeleteShareGroupOffsets(groupId, topics,
errorTopicResponseList));
verify(groupMetadataManager,
times(1)).completeDeleteShareGroupOffsets(any(), any(), any());
}
+
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ public void testFetchOffsets(boolean fetchAllOffsets) {
+ var offsetMetadataManager = mock(OffsetMetadataManager.class);
+ var coordinator = new GroupCoordinatorShard(
+ new LogContext(),
+ mock(GroupMetadataManager.class),
+ offsetMetadataManager,
+ Time.SYSTEM,
+ new MockCoordinatorTimer<>(Time.SYSTEM),
+ mock(GroupCoordinatorConfig.class),
+ mock(CoordinatorMetrics.class),
+ mock(CoordinatorMetricsShard.class)
+ );
+
+ var request = new OffsetFetchRequestData.OffsetFetchRequestGroup()
+ .setGroupId("foo");
+
+ if (fetchAllOffsets) {
+ request.setTopics(null);
+ } else {
+ request.setTopics(List.of(new
OffsetFetchRequestData.OffsetFetchRequestTopics()
+ .setName("foo")
+ .setPartitionIndexes(List.of(0))
+ ));
+ }
+
+ var result = new OffsetFetchResponseData.OffsetFetchResponseGroup()
+ .setGroupId("foo");
+
+ if (fetchAllOffsets) {
+ when(offsetMetadataManager.fetchAllOffsets(
+ request,
+ Long.MAX_VALUE
+ )).thenReturn(result);
+ } else {
+ when(offsetMetadataManager.fetchOffsets(
+ request,
+ Long.MAX_VALUE
+ )).thenReturn(result);
+ }
+
+ assertEquals(result, coordinator.fetchOffsets(request,
Long.MAX_VALUE));
+ }
}