This is an automated email from the ASF dual-hosted git repository.
schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 1146f97770c KAFKA-19800: Compute share partition lag in
GroupCoordinatorService (#20839)
1146f97770c is described below
commit 1146f97770c2c25ccf189563c419325f23dde3ec
Author: Chirag Wadhwa <[email protected]>
AuthorDate: Tue Nov 11 20:44:58 2025 +0530
KAFKA-19800: Compute share partition lag in GroupCoordinatorService (#20839)
This PR is part of
[KIP-1226](https://cwiki.apache.org/confluence/display/KAFKA/KIP-1226%3A+Introducing+Share+Partition+Lag+Persistence+and+Retrieval).
This PR computes the share partition lag in GroupCoordinatorService
using `deliveryCompleteCount` received from `readSummary`, and partition
end offsets received from `adminClient.listOffstes`. The computed lag is
returned to the end user in DescribeShareGroupOffsetsResponse.
NOTE: The GroupCoordinator is built with a no-op implementation of
PartitionMetadataClient, which returns -1 as the partition end offset
for any requested topic partition. This will later be replaced with an
actual implementation that uses InterBrokerSendThread to retrieve
partition end offsets via ListOffsets RPC.
Reviewers: Apoorv Mittal <[email protected]>, Andrew Schofield
<[email protected]>
---
.../internals/ListShareGroupOffsetsHandler.java | 3 +-
.../message/DescribeShareGroupOffsetsRequest.json | 5 +-
.../message/DescribeShareGroupOffsetsResponse.json | 7 +-
.../kafka/common/requests/RequestResponseTest.java | 1 +
.../src/main/scala/kafka/server/BrokerServer.scala | 26 +-
core/src/main/scala/kafka/server/KafkaApis.scala | 1 +
.../scala/unit/kafka/server/KafkaApisTest.scala | 30 ++
.../coordinator/group/GroupCoordinatorService.java | 148 +++++++--
.../coordinator/group/PartitionMetadataClient.java | 38 +++
.../group/GroupCoordinatorServiceTest.java | 331 ++++++++++++++++++++-
.../server/share/persister/PartitionFactory.java | 1 +
11 files changed, 565 insertions(+), 26 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/admin/internals/ListShareGroupOffsetsHandler.java
b/clients/src/main/java/org/apache/kafka/clients/admin/internals/ListShareGroupOffsetsHandler.java
index 5b75a482aae..2d033510d7f 100644
---
a/clients/src/main/java/org/apache/kafka/clients/admin/internals/ListShareGroupOffsetsHandler.java
+++
b/clients/src/main/java/org/apache/kafka/clients/admin/internals/ListShareGroupOffsetsHandler.java
@@ -133,11 +133,12 @@ public class ListShareGroupOffsetsHandler extends
AdminApiHandler.Batched<Coordi
if (partitionResponse.errorCode() ==
Errors.NONE.code()) {
final long startOffset =
partitionResponse.startOffset();
final Optional<Integer> leaderEpoch =
partitionResponse.leaderEpoch() < 0 ? Optional.empty() :
Optional.of(partitionResponse.leaderEpoch());
+ final Optional<Long> lag =
partitionResponse.lag() < 0 ? Optional.empty() :
Optional.of(partitionResponse.lag());
// Negative offset indicates there is no start
offset for this partition
if (partitionResponse.startOffset() < 0) {
groupOffsetsListing.put(tp, null);
} else {
- groupOffsetsListing.put(tp, new
SharePartitionOffsetInfo(startOffset, leaderEpoch, Optional.empty()));
+ groupOffsetsListing.put(tp, new
SharePartitionOffsetInfo(startOffset, leaderEpoch, lag));
}
} else {
log.warn("Skipping return offset for {} due to
error {}: {}.", tp, partitionResponse.errorCode(),
partitionResponse.errorMessage());
diff --git
a/clients/src/main/resources/common/message/DescribeShareGroupOffsetsRequest.json
b/clients/src/main/resources/common/message/DescribeShareGroupOffsetsRequest.json
index f87c1fc394c..c1a9544a82a 100644
---
a/clients/src/main/resources/common/message/DescribeShareGroupOffsetsRequest.json
+++
b/clients/src/main/resources/common/message/DescribeShareGroupOffsetsRequest.json
@@ -18,7 +18,10 @@
"type": "request",
"listeners": ["broker"],
"name": "DescribeShareGroupOffsetsRequest",
- "validVersions": "0",
+ // Version 0 is the initial version (KIP-932).
+ //
+ // Version 1 introduces Lag in the response (KIP-1226).
+ "validVersions": "0-1",
"flexibleVersions": "0+",
"fields": [
{ "name": "Groups", "type": "[]DescribeShareGroupOffsetsRequestGroup",
"versions": "0+",
diff --git
a/clients/src/main/resources/common/message/DescribeShareGroupOffsetsResponse.json
b/clients/src/main/resources/common/message/DescribeShareGroupOffsetsResponse.json
index 692a2657424..f036de04e1a 100644
---
a/clients/src/main/resources/common/message/DescribeShareGroupOffsetsResponse.json
+++
b/clients/src/main/resources/common/message/DescribeShareGroupOffsetsResponse.json
@@ -17,7 +17,10 @@
"apiKey": 90,
"type": "response",
"name": "DescribeShareGroupOffsetsResponse",
- "validVersions": "0",
+ // Version 0 is the initial version (KIP-932).
+ //
+ // Version 1 introduces Lag (KIP-1226).
+ "validVersions": "0-1",
"flexibleVersions": "0+",
// Supported errors:
// - GROUP_AUTHORIZATION_FAILED (version 0+)
@@ -48,6 +51,8 @@
"about": "The share-partition start offset." },
{ "name": "LeaderEpoch", "type": "int32", "versions": "0+",
"about": "The leader epoch of the partition." },
+ { "name": "Lag", "type": "int64", "versions": "1+", "ignorable":
"true", "default": -1,
+ "about": "The share-partition lag." },
{ "name": "ErrorCode", "type": "int16", "versions": "0+",
"about": "The partition-level error code, or 0 if there was no
error." },
{ "name": "ErrorMessage", "type": "string", "versions": "0+",
"nullableVersions": "0+", "default": "null",
diff --git
a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index 596828a0839..7834db2e5fa 100644
---
a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++
b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -3841,6 +3841,7 @@ public class RequestResponseTest {
.setPartitionIndex(0)
.setErrorCode(Errors.NONE.code())
.setStartOffset(0)
+ .setLag(0)
.setLeaderEpoch(0)))))));
return new DescribeShareGroupOffsetsResponse(data);
}
diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala
b/core/src/main/scala/kafka/server/BrokerServer.scala
index 25a46d4f37d..5772a2879bc 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -36,7 +36,7 @@ import org.apache.kafka.common.utils.{LogContext, Time, Utils}
import org.apache.kafka.common.{ClusterResource, TopicPartition, Uuid}
import org.apache.kafka.coordinator.common.runtime.{CoordinatorLoaderImpl,
CoordinatorRecord}
import org.apache.kafka.coordinator.group.metrics.{GroupCoordinatorMetrics,
GroupCoordinatorRuntimeMetrics}
-import org.apache.kafka.coordinator.group.{GroupConfigManager,
GroupCoordinator, GroupCoordinatorRecordSerde, GroupCoordinatorService}
+import org.apache.kafka.coordinator.group.{GroupConfigManager,
GroupCoordinator, GroupCoordinatorRecordSerde, GroupCoordinatorService,
PartitionMetadataClient}
import org.apache.kafka.coordinator.share.metrics.{ShareCoordinatorMetrics,
ShareCoordinatorRuntimeMetrics}
import org.apache.kafka.coordinator.share.{ShareCoordinator,
ShareCoordinatorRecordSerde, ShareCoordinatorService}
import org.apache.kafka.coordinator.transaction.ProducerIdManager
@@ -119,6 +119,8 @@ class BrokerServer(
var credentialProvider: CredentialProvider = _
var tokenCache: DelegationTokenCache = _
+ var partitionMetadataClient: PartitionMetadataClient = _
+
@volatile var groupCoordinator: GroupCoordinator = _
var groupConfigManager: GroupConfigManager = _
@@ -371,6 +373,8 @@ class BrokerServer(
/* create persister */
persister = createShareStatePersister()
+ partitionMetadataClient = createPartitionMetadataClient()
+
groupCoordinator = createGroupCoordinator()
val producerIdManagerSupplier = () => ProducerIdManager.rpc(
@@ -620,6 +624,25 @@ class BrokerServer(
}
}
+ private def createPartitionMetadataClient(): PartitionMetadataClient = {
+ // This is a no-op implementation of PartitionMetadataClient. It always
returns -1 as the latest offset for any
+ // requested topic partition.
+ // TODO: KAFKA-19800: Implement a real PartitionMetadataClient that can
fetch latest offsets via InterBrokerSendThread.
+ new PartitionMetadataClient {
+ override def listLatestOffsets(topicPartitions: util.Set[TopicPartition]
+ ): util.Map[TopicPartition,
util.concurrent.CompletableFuture[java.lang.Long]] = {
+ topicPartitions.asScala
+ .map { tp =>
+ tp ->
CompletableFuture.completedFuture(java.lang.Long.valueOf(-1L))
+ }
+ .toMap
+ .asJava
+ }
+
+ override def close(): Unit = {}
+ }
+ }
+
private def createGroupCoordinator(): GroupCoordinator = {
// Create group coordinator, but don't start it until we've started
replica manager.
// Hardcode Time.SYSTEM for now as some Streams tests fail otherwise, it
would be good
@@ -651,6 +674,7 @@ class BrokerServer(
.withGroupConfigManager(groupConfigManager)
.withPersister(persister)
.withAuthorizerPlugin(authorizerPlugin.toJava)
+ .withPartitionMetadataClient(partitionMetadataClient)
.build()
}
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala
b/core/src/main/scala/kafka/server/KafkaApis.scala
index ea080147b53..c31c404eef9 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -3820,6 +3820,7 @@ class KafkaApis(val requestChannel: RequestChannel,
topicResponse.partitions.add(new
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition()
.setPartitionIndex(partitionIndex)
.setStartOffset(-1)
+ .setLag(-1)
.setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code)
.setErrorMessage(Errors.TOPIC_AUTHORIZATION_FAILED.message))
}
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index d3c11408465..96ea2ae8858 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -12717,18 +12717,21 @@ class KafkaApisTest extends Logging {
new DescribeShareGroupOffsetsResponsePartition()
.setPartitionIndex(1)
.setStartOffset(0)
+ .setLag(0)
.setLeaderEpoch(1)
.setErrorMessage(null)
.setErrorCode(0),
new DescribeShareGroupOffsetsResponsePartition()
.setPartitionIndex(2)
.setStartOffset(0)
+ .setLag(0)
.setLeaderEpoch(1)
.setErrorMessage(null)
.setErrorCode(0),
new DescribeShareGroupOffsetsResponsePartition()
.setPartitionIndex(3)
.setStartOffset(0)
+ .setLag(0)
.setLeaderEpoch(1)
.setErrorMessage(null)
.setErrorCode(0)
@@ -12740,12 +12743,14 @@ class KafkaApisTest extends Logging {
new DescribeShareGroupOffsetsResponsePartition()
.setPartitionIndex(10)
.setStartOffset(0)
+ .setLag(0)
.setLeaderEpoch(1)
.setErrorMessage(null)
.setErrorCode(0),
new DescribeShareGroupOffsetsResponsePartition()
.setPartitionIndex(20)
.setStartOffset(0)
+ .setLag(0)
.setLeaderEpoch(1)
.setErrorMessage(null)
.setErrorCode(0)
@@ -12762,6 +12767,7 @@ class KafkaApisTest extends Logging {
new DescribeShareGroupOffsetsResponsePartition()
.setPartitionIndex(0)
.setStartOffset(0)
+ .setLag(0)
.setLeaderEpoch(1)
.setErrorMessage(null)
.setErrorCode(0)
@@ -12861,18 +12867,21 @@ class KafkaApisTest extends Logging {
new DescribeShareGroupOffsetsResponsePartition()
.setPartitionIndex(1)
.setStartOffset(0)
+ .setLag(0)
.setLeaderEpoch(1)
.setErrorMessage(null)
.setErrorCode(0),
new DescribeShareGroupOffsetsResponsePartition()
.setPartitionIndex(2)
.setStartOffset(0)
+ .setLag(0)
.setLeaderEpoch(1)
.setErrorMessage(null)
.setErrorCode(0),
new DescribeShareGroupOffsetsResponsePartition()
.setPartitionIndex(3)
.setStartOffset(0)
+ .setLag(0)
.setLeaderEpoch(1)
.setErrorMessage(null)
.setErrorCode(0)
@@ -12884,12 +12893,14 @@ class KafkaApisTest extends Logging {
new DescribeShareGroupOffsetsResponsePartition()
.setPartitionIndex(10)
.setStartOffset(-1)
+ .setLag(-1)
.setLeaderEpoch(0)
.setErrorMessage(Errors.TOPIC_AUTHORIZATION_FAILED.message)
.setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code),
new DescribeShareGroupOffsetsResponsePartition()
.setPartitionIndex(20)
.setStartOffset(-1)
+ .setLag(-1)
.setLeaderEpoch(0)
.setErrorMessage(Errors.TOPIC_AUTHORIZATION_FAILED.message)
.setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code)
@@ -12906,6 +12917,7 @@ class KafkaApisTest extends Logging {
new DescribeShareGroupOffsetsResponsePartition()
.setPartitionIndex(0)
.setStartOffset(-1)
+ .setLag(-1)
.setLeaderEpoch(0)
.setErrorMessage(Errors.TOPIC_AUTHORIZATION_FAILED.message)
.setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code)
@@ -12926,18 +12938,21 @@ class KafkaApisTest extends Logging {
new DescribeShareGroupOffsetsResponsePartition()
.setPartitionIndex(1)
.setStartOffset(0)
+ .setLag(0)
.setLeaderEpoch(1)
.setErrorMessage(null)
.setErrorCode(0),
new DescribeShareGroupOffsetsResponsePartition()
.setPartitionIndex(2)
.setStartOffset(0)
+ .setLag(0)
.setLeaderEpoch(1)
.setErrorMessage(null)
.setErrorCode(0),
new DescribeShareGroupOffsetsResponsePartition()
.setPartitionIndex(3)
.setStartOffset(0)
+ .setLag(0)
.setLeaderEpoch(1)
.setErrorMessage(null)
.setErrorCode(0)
@@ -13022,18 +13037,21 @@ class KafkaApisTest extends Logging {
new DescribeShareGroupOffsetsResponsePartition()
.setPartitionIndex(1)
.setStartOffset(0)
+ .setLag(0)
.setLeaderEpoch(1)
.setErrorMessage(null)
.setErrorCode(0),
new DescribeShareGroupOffsetsResponsePartition()
.setPartitionIndex(2)
.setStartOffset(0)
+ .setLag(0)
.setLeaderEpoch(1)
.setErrorMessage(null)
.setErrorCode(0),
new DescribeShareGroupOffsetsResponsePartition()
.setPartitionIndex(3)
.setStartOffset(0)
+ .setLag(0)
.setLeaderEpoch(1)
.setErrorMessage(null)
.setErrorCode(0)
@@ -13055,18 +13073,21 @@ class KafkaApisTest extends Logging {
new DescribeShareGroupOffsetsResponsePartition()
.setPartitionIndex(1)
.setStartOffset(0)
+ .setLag(0)
.setLeaderEpoch(1)
.setErrorMessage(null)
.setErrorCode(0),
new DescribeShareGroupOffsetsResponsePartition()
.setPartitionIndex(2)
.setStartOffset(0)
+ .setLag(0)
.setLeaderEpoch(1)
.setErrorMessage(null)
.setErrorCode(0),
new DescribeShareGroupOffsetsResponsePartition()
.setPartitionIndex(3)
.setStartOffset(0)
+ .setLag(0)
.setLeaderEpoch(1)
.setErrorMessage(null)
.setErrorCode(0)
@@ -13078,12 +13099,14 @@ class KafkaApisTest extends Logging {
new DescribeShareGroupOffsetsResponsePartition()
.setPartitionIndex(10)
.setStartOffset(0)
+ .setLag(0)
.setLeaderEpoch(1)
.setErrorMessage(null)
.setErrorCode(0),
new DescribeShareGroupOffsetsResponsePartition()
.setPartitionIndex(20)
.setStartOffset(0)
+ .setLag(0)
.setLeaderEpoch(1)
.setErrorMessage(null)
.setErrorCode(0)
@@ -13100,6 +13123,7 @@ class KafkaApisTest extends Logging {
new DescribeShareGroupOffsetsResponsePartition()
.setPartitionIndex(0)
.setStartOffset(0)
+ .setLag(0)
.setLeaderEpoch(1)
.setErrorMessage(null)
.setErrorCode(0)
@@ -13160,18 +13184,21 @@ class KafkaApisTest extends Logging {
new DescribeShareGroupOffsetsResponsePartition()
.setPartitionIndex(1)
.setStartOffset(0)
+ .setLag(0)
.setLeaderEpoch(1)
.setErrorMessage(null)
.setErrorCode(0),
new DescribeShareGroupOffsetsResponsePartition()
.setPartitionIndex(2)
.setStartOffset(0)
+ .setLag(0)
.setLeaderEpoch(1)
.setErrorMessage(null)
.setErrorCode(0),
new DescribeShareGroupOffsetsResponsePartition()
.setPartitionIndex(3)
.setStartOffset(0)
+ .setLag(0)
.setLeaderEpoch(1)
.setErrorMessage(null)
.setErrorCode(0)
@@ -13183,12 +13210,14 @@ class KafkaApisTest extends Logging {
new DescribeShareGroupOffsetsResponsePartition()
.setPartitionIndex(10)
.setStartOffset(0)
+ .setLag(0)
.setLeaderEpoch(1)
.setErrorMessage(null)
.setErrorCode(0),
new DescribeShareGroupOffsetsResponsePartition()
.setPartitionIndex(20)
.setStartOffset(0)
+ .setLag(0)
.setLeaderEpoch(1)
.setErrorMessage(null)
.setErrorCode(0)
@@ -13205,6 +13234,7 @@ class KafkaApisTest extends Logging {
new DescribeShareGroupOffsetsResponsePartition()
.setPartitionIndex(0)
.setStartOffset(0)
+ .setLag(0)
.setLeaderEpoch(1)
.setErrorMessage(null)
.setErrorCode(0)
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
index 9c300c529b5..3ffb2d3e644 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
@@ -107,8 +107,10 @@ import
org.apache.kafka.server.share.persister.InitializeShareGroupStateResult;
import org.apache.kafka.server.share.persister.PartitionErrorData;
import org.apache.kafka.server.share.persister.PartitionFactory;
import org.apache.kafka.server.share.persister.PartitionStateData;
+import org.apache.kafka.server.share.persister.PartitionStateSummaryData;
import org.apache.kafka.server.share.persister.Persister;
import
org.apache.kafka.server.share.persister.ReadShareGroupStateSummaryParameters;
+import
org.apache.kafka.server.share.persister.ReadShareGroupStateSummaryResult;
import org.apache.kafka.server.share.persister.TopicData;
import org.apache.kafka.server.util.FutureUtils;
import org.apache.kafka.server.util.timer.Timer;
@@ -163,6 +165,7 @@ public class GroupCoordinatorService implements
GroupCoordinator {
private GroupConfigManager groupConfigManager;
private Persister persister;
private Optional<Plugin<Authorizer>> authorizerPlugin;
+ private PartitionMetadataClient partitionMetadataClient;
public Builder(
int nodeId,
@@ -217,6 +220,11 @@ public class GroupCoordinatorService implements
GroupCoordinator {
return this;
}
+ public Builder withPartitionMetadataClient(PartitionMetadataClient
partitionMetadataClient) {
+ this.partitionMetadataClient = partitionMetadataClient;
+ return this;
+ }
+
public GroupCoordinatorService build() {
requireNonNull(config, "Config must be set.");
requireNonNull(writer, "Writer must be set.");
@@ -228,6 +236,7 @@ public class GroupCoordinatorService implements
GroupCoordinator {
requireNonNull(groupConfigManager, "GroupConfigManager must be
set.");
requireNonNull(persister, "Persister must be set.");
requireNonNull(authorizerPlugin, "Authorizer must be set.");
+ requireNonNull(partitionMetadataClient, "PartitionMetadataClient
must be set.");
String logPrefix = String.format("GroupCoordinator id=%d", nodeId);
LogContext logContext = new LogContext(String.format("[%s] ",
logPrefix));
@@ -270,7 +279,8 @@ public class GroupCoordinatorService implements
GroupCoordinator {
groupCoordinatorMetrics,
groupConfigManager,
persister,
- timer
+ timer,
+ partitionMetadataClient
);
}
}
@@ -320,6 +330,11 @@ public class GroupCoordinatorService implements
GroupCoordinator {
*/
private final Set<String> consumerGroupAssignors;
+ /**
+ * The client used for getting partition end offsets
+ */
+ private final PartitionMetadataClient partitionMetadataClient;
+
/**
* The number of partitions of the __consumer_offsets topics. This is
provided
* when the component is started.
@@ -349,7 +364,8 @@ public class GroupCoordinatorService implements
GroupCoordinator {
GroupCoordinatorMetrics groupCoordinatorMetrics,
GroupConfigManager groupConfigManager,
Persister persister,
- Timer timer
+ Timer timer,
+ PartitionMetadataClient partitionMetadataClient
) {
this.log = logContext.logger(GroupCoordinatorService.class);
this.config = config;
@@ -363,6 +379,7 @@ public class GroupCoordinatorService implements
GroupCoordinator {
.stream()
.map(ConsumerGroupPartitionAssignor::name)
.collect(Collectors.toSet());
+ this.partitionMetadataClient = partitionMetadataClient;
}
/**
@@ -1835,27 +1852,122 @@ public class GroupCoordinatorService implements
GroupCoordinator {
return;
}
- // Return -1 (uninitialized offset) for the situation where
the persister returned an error.
- // This is consistent with OffsetFetch for situations in which
there is no offset information to fetch.
- // It's treated as absence of data, rather than an error.
- result.topicsData().forEach(topicData ->
- describeShareGroupOffsetsResponseTopicList.add(new
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic()
- .setTopicId(topicData.topicId())
-
.setTopicName(requestTopicIdToNameMapping.get(topicData.topicId()))
- .setPartitions(topicData.partitions().stream().map(
- partitionData -> new
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition()
+ // Now compute lag for each partition and build the final
response.
+ computeShareGroupLagAndBuildResponse(
+ result,
+ requestTopicIdToNameMapping,
+ describeShareGroupOffsetsResponseTopicList,
+ future,
+ readSummaryRequestData.groupId()
+ );
+ });
+ return future;
+ }
+
+ private void computeShareGroupLagAndBuildResponse(
+ ReadShareGroupStateSummaryResult readSummaryResult,
+ Map<Uuid, String> requestTopicIdToNameMapping,
+
List<DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic>
describeShareGroupOffsetsResponseTopicList,
+
CompletableFuture<DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup>
responseFuture,
+ String groupId
+ ) {
+ // This set keeps track of the partitions for which lag computation is
needed.
+ Set<TopicPartition> partitionsToComputeLag = new HashSet<>();
+
+ readSummaryResult.topicsData().forEach(topicData -> {
+ topicData.partitions().forEach(partitionData -> {
+ if (shouldComputeSharePartitionLag(partitionData)) {
+ // If the readSummaryResult is successful for a partition,
we need to compute lag.
+ partitionsToComputeLag.add(new
TopicPartition(requestTopicIdToNameMapping.get(topicData.topicId()),
partitionData.partition()));
+ }
+ });
+ });
+
+ // Fetch latest offsets for all partitions that need lag computation.
+ Map<TopicPartition, CompletableFuture<Long>> partitionLatestOffsets =
partitionsToComputeLag.isEmpty() ? Map.of() :
+
partitionMetadataClient.listLatestOffsets(partitionsToComputeLag);
+
+ // Final response object to be built. It will include lag information
computed from partitionMetadataClient.
+
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup
responseGroup =
+ new
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup()
+ .setGroupId(groupId);
+
+ // List of response topics to be set in the response group.
+
List<DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic>
responseTopics = new ArrayList<>();
+
+ CompletableFuture.allOf(partitionLatestOffsets.values().toArray(new
CompletableFuture<?>[0]))
+ .whenComplete((result, error) -> {
+ // The error variable will not be null when one or more of the
partitionLatestOffsets futures get completed exceptionally.
+ // If that is the case, then the same exception would be
caught in the try catch executed below when .join() is called.
+ // Thus, we do not need to check error != null here.
+ readSummaryResult.topicsData().forEach(topicData -> {
+ // Build response for each topic.
+
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic
topic =
+ new
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic()
+ .setTopicId(topicData.topicId())
+
.setTopicName(requestTopicIdToNameMapping.get(topicData.topicId()));
+
+ // Build response for each partition within the topic.
+
List<DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition>
partitionResponses = new ArrayList<>();
+
+ topicData.partitions().forEach(partitionData -> {
+ TopicPartition tp = new
TopicPartition(requestTopicIdToNameMapping.get(topicData.topicId()),
partitionData.partition());
+ // For the partitions where lag computation is not
needed, a partitionResponse is built directly.
+ // The lag is set to -1 (uninitialized lag) in these
cases. If the persister returned an error for a
+ // partition, the startOffset is set to -1
(uninitialized offset) and the leaderEpoch is set to 0
+ // (default epoch). This is consistent with
OffsetFetch for situations in which there is no offset
+ // information to fetch. It's treated as absence of
data, rather than an error
+ if (!shouldComputeSharePartitionLag(partitionData)) {
+ partitionResponses.add(new
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition()
.setPartitionIndex(partitionData.partition())
.setStartOffset(partitionData.errorCode() ==
Errors.NONE.code() ? partitionData.startOffset() :
PartitionFactory.UNINITIALIZED_START_OFFSET)
.setLeaderEpoch(partitionData.errorCode() ==
Errors.NONE.code() ? partitionData.leaderEpoch() :
PartitionFactory.DEFAULT_LEADER_EPOCH)
- ).toList())
- ));
+ .setLag(PartitionFactory.UNINITIALIZED_LAG));
+ } else {
+ try {
+ // This code is reached when allOf above is
complete, which happens when all the
+ // individual futures are complete. Thus, the
call to join() here is safe.
+ long partitionLatestOffset =
partitionLatestOffsets.get(tp).join();
+ // Compute lag as (partition end offset -
startOffset - deliveryCompleteCount).
+ // Note, partition end offset, which is
retrieved from partitionMetadataClient, is the offset of
+ // the next message to be produced, not the
last message offset. Thus, the formula for lag computation
+ // does not need a +1 adjustment.
+ long lag = partitionLatestOffset -
partitionData.startOffset() - partitionData.deliveryCompleteCount();
+ partitionResponses.add(new
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition()
+
.setPartitionIndex(partitionData.partition())
+
.setStartOffset(partitionData.startOffset())
+
.setLeaderEpoch(partitionData.leaderEpoch())
+ .setLag(lag));
+ } catch (CompletionException e) {
+ // If fetching latest offset for a partition
failed, return the error in the response for that partition.
+ partitionResponses.add(new
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition()
+
.setPartitionIndex(partitionData.partition())
+
.setErrorCode(Errors.forException(e.getCause()).code())
+
.setErrorMessage(e.getCause().getMessage()));
+ }
+ }
+ });
+ topic.setPartitions(partitionResponses);
+ responseTopics.add(topic);
+ });
- future.complete(
- new
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup()
- .setGroupId(readSummaryRequestData.groupId())
-
.setTopics(describeShareGroupOffsetsResponseTopicList));
+ // Add topics which did not exist in the metadata image and
were handled earlier.
+
responseTopics.addAll(describeShareGroupOffsetsResponseTopicList);
+ // Set topics in the response group.
+ responseGroup.setTopics(responseTopics);
+ // Complete the future with the built response.
+ responseFuture.complete(responseGroup);
});
- return future;
+ }
+
+ private boolean shouldComputeSharePartitionLag(PartitionStateSummaryData
partitionData) {
+ // The share partition lag would be computed for a share partition ony
if -
+ // 1. The read summary result for the partition is successful.
+ // 3. The start offset is initialized.
+ // 4. The delivery complete count is initialized.
+ return partitionData.errorCode() == Errors.NONE.code() &&
+ partitionData.startOffset() !=
PartitionFactory.UNINITIALIZED_START_OFFSET &&
+ partitionData.deliveryCompleteCount() !=
PartitionFactory.UNINITIALIZED_DELIVERY_COMPLETE_COUNT;
}
/**
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/PartitionMetadataClient.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/PartitionMetadataClient.java
new file mode 100644
index 00000000000..ea6d940143f
--- /dev/null
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/PartitionMetadataClient.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.TopicPartition;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Client interface for retrieving latest offsets for topic partitions.
+ */
+public interface PartitionMetadataClient extends AutoCloseable {
+ /**
+ * Lists the latest offsets for the provided topic partitions.
+ *
+ * @param topicPartitions A set of topic partitions.
+ * @return A map of topic partitions to the completableFuture of their
latest offsets
+ */
+ Map<TopicPartition, CompletableFuture<Long>> listLatestOffsets(
+ Set<TopicPartition> topicPartitions
+ );
+}
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
index 01832467b0f..1a4efc6d989 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
@@ -128,6 +128,7 @@ import org.mockito.ArgumentMatchers;
import java.net.InetAddress;
import java.time.Duration;
import java.util.Arrays;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -3714,7 +3715,8 @@ public class GroupCoordinatorServiceTest {
.setTopicId(TOPIC_ID)
.setPartitions(List.of(new
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition()
.setPartitionIndex(partition)
-
.setStartOffset(PartitionFactory.UNINITIALIZED_START_OFFSET))))
+
.setStartOffset(PartitionFactory.UNINITIALIZED_START_OFFSET)
+ .setLag(PartitionFactory.UNINITIALIZED_LAG))))
);
CompletableFuture<DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup>
future =
@@ -3727,13 +3729,21 @@ public class GroupCoordinatorServiceTest {
public void testDescribeShareGroupOffsetsWithDefaultPersister() throws
InterruptedException, ExecutionException {
CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime =
mockRuntime();
Persister persister = mock(DefaultStatePersister.class);
+
+ PartitionMetadataClient partitionMetadataClient =
mock(PartitionMetadataClient.class);
+
GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
.setConfig(createConfig())
.setRuntime(runtime)
.setPersister(persister)
+ .setPartitionMetadataClient(partitionMetadataClient)
.build(true);
service.startup(() -> 1);
+ Set<TopicPartition> partitionsToComputeLag = new HashSet<>(Set.of(new
TopicPartition(TOPIC_NAME, 1)));
+ when(partitionMetadataClient.listLatestOffsets(partitionsToComputeLag))
+ .thenReturn(Map.of(new TopicPartition(TOPIC_NAME, 1),
CompletableFuture.completedFuture(41L)));
+
int partition = 1;
DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestGroup
requestData = new
DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestGroup()
.setGroupId("share-group-id")
@@ -3757,7 +3767,8 @@ public class GroupCoordinatorServiceTest {
.setTopicId(TOPIC_ID)
.setPartitions(List.of(new
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition()
.setPartitionIndex(partition)
- .setStartOffset(21))))
+ .setStartOffset(21)
+ .setLag(10L))))
);
ReadShareGroupStateSummaryResponseData
readShareGroupStateSummaryResponseData = new
ReadShareGroupStateSummaryResponseData()
@@ -3767,6 +3778,7 @@ public class GroupCoordinatorServiceTest {
.setPartitions(List.of(new
ReadShareGroupStateSummaryResponseData.PartitionResult()
.setPartition(partition)
.setStartOffset(21)
+ .setDeliveryCompleteCount(10)
.setStateEpoch(1)))
)
);
@@ -3903,6 +3915,144 @@ public class GroupCoordinatorServiceTest {
assertFutureThrows(IllegalStateException.class, future, "Result is
null for the read state summary");
}
+ @Test
+ public void
testDescribeShareGroupOffsetsWithDefaultPersisterReadSummaryPartitionError()
throws InterruptedException, ExecutionException {
+ CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime =
mockRuntime();
+ Persister persister = mock(DefaultStatePersister.class);
+
+ GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
+ .setConfig(createConfig())
+ .setRuntime(runtime)
+ .setPersister(persister)
+ .build(true);
+ service.startup(() -> 1);
+
+ int partition = 1;
+
DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestGroup
requestData = new
DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestGroup()
+ .setGroupId("share-group-id")
+ .setTopics(List.of(new
DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestTopic()
+ .setTopicName(TOPIC_NAME)
+ .setPartitions(List.of(partition))
+ ));
+
+ ReadShareGroupStateSummaryRequestData
readShareGroupStateSummaryRequestData = new
ReadShareGroupStateSummaryRequestData()
+ .setGroupId("share-group-id")
+ .setTopics(List.of(new
ReadShareGroupStateSummaryRequestData.ReadStateSummaryData()
+ .setTopicId(TOPIC_ID)
+ .setPartitions(List.of(new
ReadShareGroupStateSummaryRequestData.PartitionData()
+ .setPartition(partition)))));
+
+
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup
responseData = new
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup()
+ .setGroupId("share-group-id")
+ .setTopics(
+ List.of(new
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic()
+ .setTopicName(TOPIC_NAME)
+ .setTopicId(TOPIC_ID)
+ .setPartitions(List.of(new
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition()
+ .setPartitionIndex(partition)
+
.setStartOffset(PartitionFactory.UNINITIALIZED_START_OFFSET)
+ .setLeaderEpoch(PartitionFactory.DEFAULT_LEADER_EPOCH)
+ .setLag(PartitionFactory.UNINITIALIZED_LAG))))
+ );
+
+ ReadShareGroupStateSummaryResponseData
readShareGroupStateSummaryResponseData = new
ReadShareGroupStateSummaryResponseData()
+ .setResults(
+ List.of(new
ReadShareGroupStateSummaryResponseData.ReadStateSummaryResult()
+ .setTopicId(TOPIC_ID)
+ .setPartitions(List.of(new
ReadShareGroupStateSummaryResponseData.PartitionResult()
+ .setPartition(partition)
+ .setErrorCode(Errors.UNKNOWN_SERVER_ERROR.code())
+ .setErrorMessage(Errors.UNKNOWN_SERVER_ERROR.message())
+ ))
+ )
+ );
+
+ ReadShareGroupStateSummaryParameters
readShareGroupStateSummaryParameters =
ReadShareGroupStateSummaryParameters.from(readShareGroupStateSummaryRequestData);
+ ReadShareGroupStateSummaryResult readShareGroupStateSummaryResult =
ReadShareGroupStateSummaryResult.from(readShareGroupStateSummaryResponseData);
+ when(persister.readSummary(
+ ArgumentMatchers.eq(readShareGroupStateSummaryParameters)
+
)).thenReturn(CompletableFuture.completedFuture(readShareGroupStateSummaryResult));
+
+
CompletableFuture<DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup>
future =
+
service.describeShareGroupOffsets(requestContext(ApiKeys.DESCRIBE_SHARE_GROUP_OFFSETS),
requestData);
+
+ assertEquals(responseData, future.get());
+ }
+
+ @Test
+ public void
testDescribeShareGroupOffsetsWithDefaultPersisterLatestOffsetError() throws
InterruptedException, ExecutionException {
+ CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime =
mockRuntime();
+ Persister persister = mock(DefaultStatePersister.class);
+
+ PartitionMetadataClient partitionMetadataClient =
mock(PartitionMetadataClient.class);
+
+ GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
+ .setConfig(createConfig())
+ .setRuntime(runtime)
+ .setPersister(persister)
+ .setPartitionMetadataClient(partitionMetadataClient)
+ .build(true);
+ service.startup(() -> 1);
+
+ Exception ex = new Exception("failure");
+
+ Set<TopicPartition> partitionsToComputeLag = new HashSet<>(Set.of(new
TopicPartition(TOPIC_NAME, 1)));
+ when(partitionMetadataClient.listLatestOffsets(partitionsToComputeLag))
+ .thenReturn(Map.of(new TopicPartition(TOPIC_NAME, 1),
CompletableFuture.failedFuture(ex)));
+
+ int partition = 1;
+
DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestGroup
requestData = new
DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestGroup()
+ .setGroupId("share-group-id")
+ .setTopics(List.of(new
DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestTopic()
+ .setTopicName(TOPIC_NAME)
+ .setPartitions(List.of(partition))
+ ));
+
+ ReadShareGroupStateSummaryRequestData
readShareGroupStateSummaryRequestData = new
ReadShareGroupStateSummaryRequestData()
+ .setGroupId("share-group-id")
+ .setTopics(List.of(new
ReadShareGroupStateSummaryRequestData.ReadStateSummaryData()
+ .setTopicId(TOPIC_ID)
+ .setPartitions(List.of(new
ReadShareGroupStateSummaryRequestData.PartitionData()
+ .setPartition(partition)))));
+
+
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup
responseData = new
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup()
+ .setGroupId("share-group-id")
+ .setTopics(
+ List.of(new
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic()
+ .setTopicName(TOPIC_NAME)
+ .setTopicId(TOPIC_ID)
+ .setPartitions(List.of(new
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition()
+ .setPartitionIndex(partition)
+ .setErrorCode(Errors.forException(ex).code())
+ .setErrorMessage(ex.getMessage())
+ ))
+ )
+ );
+
+ ReadShareGroupStateSummaryResponseData
readShareGroupStateSummaryResponseData = new
ReadShareGroupStateSummaryResponseData()
+ .setResults(
+ List.of(new
ReadShareGroupStateSummaryResponseData.ReadStateSummaryResult()
+ .setTopicId(TOPIC_ID)
+ .setPartitions(List.of(new
ReadShareGroupStateSummaryResponseData.PartitionResult()
+ .setPartition(partition)
+ .setStartOffset(21)
+ .setDeliveryCompleteCount(10)
+ .setStateEpoch(1)))
+ )
+ );
+
+ ReadShareGroupStateSummaryParameters
readShareGroupStateSummaryParameters =
ReadShareGroupStateSummaryParameters.from(readShareGroupStateSummaryRequestData);
+ ReadShareGroupStateSummaryResult readShareGroupStateSummaryResult =
ReadShareGroupStateSummaryResult.from(readShareGroupStateSummaryResponseData);
+ when(persister.readSummary(
+ ArgumentMatchers.eq(readShareGroupStateSummaryParameters)
+
)).thenReturn(CompletableFuture.completedFuture(readShareGroupStateSummaryResult));
+
+
CompletableFuture<DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup>
future =
+
service.describeShareGroupOffsets(requestContext(ApiKeys.DESCRIBE_SHARE_GROUP_OFFSETS),
requestData);
+
+ assertEquals(responseData, future.get());
+ }
+
@Test
public void testDescribeShareGroupOffsetsCoordinatorNotActive() throws
ExecutionException, InterruptedException {
CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime =
mockRuntime();
@@ -3964,12 +4114,20 @@ public class GroupCoordinatorServiceTest {
public void testDescribeShareGroupAllOffsets() throws
InterruptedException, ExecutionException {
CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime =
mockRuntime();
Persister persister = mock(DefaultStatePersister.class);
+
+ PartitionMetadataClient partitionMetadataClient =
mock(PartitionMetadataClient.class);
+
GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
.setConfig(createConfig())
.setRuntime(runtime)
.setPersister(persister)
+ .setPartitionMetadataClient(partitionMetadataClient)
.build(true);
+ Set<TopicPartition> partitionsToComputeLag = new HashSet<>(Set.of(new
TopicPartition(TOPIC_NAME, 1)));
+ when(partitionMetadataClient.listLatestOffsets(partitionsToComputeLag))
+ .thenReturn(Map.of(new TopicPartition(TOPIC_NAME, 1),
CompletableFuture.completedFuture(41L)));
+
MetadataImage image = new MetadataImageBuilder()
.addTopic(TOPIC_ID, TOPIC_NAME, 3)
.build();
@@ -4003,7 +4161,8 @@ public class GroupCoordinatorServiceTest {
.setTopicId(TOPIC_ID)
.setPartitions(List.of(new
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition()
.setPartitionIndex(partition)
- .setStartOffset(21))))
+ .setStartOffset(21)
+ .setLag(10L))))
);
ReadShareGroupStateSummaryResponseData
readShareGroupStateSummaryResponseData = new
ReadShareGroupStateSummaryResponseData()
@@ -4013,6 +4172,7 @@ public class GroupCoordinatorServiceTest {
.setPartitions(List.of(new
ReadShareGroupStateSummaryResponseData.PartitionResult()
.setPartition(partition)
.setStartOffset(21)
+ .setDeliveryCompleteCount(10)
.setStateEpoch(1)))
)
);
@@ -4101,6 +4261,162 @@ public class GroupCoordinatorServiceTest {
assertFutureThrows(IllegalStateException.class, future, "Result is
null for the read state summary");
}
+ @Test
+ public void testDescribeShareGroupAllOffsetsReadSummaryPartitionError()
throws InterruptedException, ExecutionException {
+ CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime =
mockRuntime();
+ Persister persister = mock(DefaultStatePersister.class);
+
+ GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
+ .setConfig(createConfig())
+ .setRuntime(runtime)
+ .setPersister(persister)
+ .build(true);
+
+ MetadataImage image = new MetadataImageBuilder()
+ .addTopic(TOPIC_ID, TOPIC_NAME, 3)
+ .build();
+
+ service.onNewMetadataImage(new KRaftCoordinatorMetadataImage(image),
null);
+
+ int partition = 1;
+
+ when(runtime.scheduleReadOperation(
+ ArgumentMatchers.eq("share-group-initialized-partitions"),
+ ArgumentMatchers.any(),
+ ArgumentMatchers.any()
+ )).thenReturn(CompletableFuture.completedFuture(Map.of(TOPIC_ID,
Set.of(partition))));
+
+
DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestGroup
requestData = new
DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestGroup()
+ .setGroupId("share-group-id")
+ .setTopics(null);
+
+ ReadShareGroupStateSummaryRequestData
readShareGroupStateSummaryRequestData = new
ReadShareGroupStateSummaryRequestData()
+ .setGroupId("share-group-id")
+ .setTopics(List.of(new
ReadShareGroupStateSummaryRequestData.ReadStateSummaryData()
+ .setTopicId(TOPIC_ID)
+ .setPartitions(List.of(new
ReadShareGroupStateSummaryRequestData.PartitionData()
+ .setPartition(partition)))));
+
+
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup
responseData = new
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup()
+ .setGroupId("share-group-id")
+ .setTopics(
+ List.of(new
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic()
+ .setTopicName(TOPIC_NAME)
+ .setTopicId(TOPIC_ID)
+ .setPartitions(List.of(new
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition()
+ .setPartitionIndex(partition)
+
.setStartOffset(PartitionFactory.UNINITIALIZED_START_OFFSET)
+ .setLeaderEpoch(PartitionFactory.DEFAULT_LEADER_EPOCH)
+ .setLag(PartitionFactory.UNINITIALIZED_LAG))))
+ );
+
+ ReadShareGroupStateSummaryResponseData
readShareGroupStateSummaryResponseData = new
ReadShareGroupStateSummaryResponseData()
+ .setResults(
+ List.of(new
ReadShareGroupStateSummaryResponseData.ReadStateSummaryResult()
+ .setTopicId(TOPIC_ID)
+ .setPartitions(List.of(new
ReadShareGroupStateSummaryResponseData.PartitionResult()
+ .setPartition(partition)
+ .setErrorCode(Errors.UNKNOWN_SERVER_ERROR.code())
+ .setErrorMessage(Errors.UNKNOWN_SERVER_ERROR.message())
+ ))
+ )
+ );
+
+ ReadShareGroupStateSummaryParameters
readShareGroupStateSummaryParameters =
ReadShareGroupStateSummaryParameters.from(readShareGroupStateSummaryRequestData);
+ ReadShareGroupStateSummaryResult readShareGroupStateSummaryResult =
ReadShareGroupStateSummaryResult.from(readShareGroupStateSummaryResponseData);
+ when(persister.readSummary(
+ ArgumentMatchers.eq(readShareGroupStateSummaryParameters)
+
)).thenReturn(CompletableFuture.completedFuture(readShareGroupStateSummaryResult));
+
+
CompletableFuture<DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup>
future =
+
service.describeShareGroupAllOffsets(requestContext(ApiKeys.DESCRIBE_SHARE_GROUP_OFFSETS),
requestData);
+
+ assertEquals(responseData, future.get());
+ }
+
+ @Test
+ public void testDescribeShareGroupAllOffsetsLatestOffsetError() throws
InterruptedException, ExecutionException {
+ CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime =
mockRuntime();
+ Persister persister = mock(DefaultStatePersister.class);
+
+ PartitionMetadataClient partitionMetadataClient =
mock(PartitionMetadataClient.class);
+
+ GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
+ .setConfig(createConfig())
+ .setRuntime(runtime)
+ .setPersister(persister)
+ .setPartitionMetadataClient(partitionMetadataClient)
+ .build(true);
+
+ Exception ex = new Exception("failure");
+
+ Set<TopicPartition> partitionsToComputeLag = new HashSet<>(Set.of(new
TopicPartition(TOPIC_NAME, 1)));
+ when(partitionMetadataClient.listLatestOffsets(partitionsToComputeLag))
+ .thenReturn(Map.of(new TopicPartition(TOPIC_NAME, 1),
CompletableFuture.failedFuture(ex)));
+
+ MetadataImage image = new MetadataImageBuilder()
+ .addTopic(TOPIC_ID, TOPIC_NAME, 3)
+ .build();
+
+ service.onNewMetadataImage(new KRaftCoordinatorMetadataImage(image),
null);
+
+ int partition = 1;
+
+ when(runtime.scheduleReadOperation(
+ ArgumentMatchers.eq("share-group-initialized-partitions"),
+ ArgumentMatchers.any(),
+ ArgumentMatchers.any()
+ )).thenReturn(CompletableFuture.completedFuture(Map.of(TOPIC_ID,
Set.of(partition))));
+
+
DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestGroup
requestData = new
DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestGroup()
+ .setGroupId("share-group-id")
+ .setTopics(null);
+
+ ReadShareGroupStateSummaryRequestData
readShareGroupStateSummaryRequestData = new
ReadShareGroupStateSummaryRequestData()
+ .setGroupId("share-group-id")
+ .setTopics(List.of(new
ReadShareGroupStateSummaryRequestData.ReadStateSummaryData()
+ .setTopicId(TOPIC_ID)
+ .setPartitions(List.of(new
ReadShareGroupStateSummaryRequestData.PartitionData()
+ .setPartition(partition)))));
+
+
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup
responseData = new
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup()
+ .setGroupId("share-group-id")
+ .setTopics(
+ List.of(new
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic()
+ .setTopicName(TOPIC_NAME)
+ .setTopicId(TOPIC_ID)
+ .setPartitions(List.of(new
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition()
+ .setPartitionIndex(partition)
+ .setErrorCode(Errors.forException(ex).code())
+ .setErrorMessage(ex.getMessage())
+ ))
+ )
+ );
+
+ ReadShareGroupStateSummaryResponseData
readShareGroupStateSummaryResponseData = new
ReadShareGroupStateSummaryResponseData()
+ .setResults(
+ List.of(new
ReadShareGroupStateSummaryResponseData.ReadStateSummaryResult()
+ .setTopicId(TOPIC_ID)
+ .setPartitions(List.of(new
ReadShareGroupStateSummaryResponseData.PartitionResult()
+ .setPartition(partition)
+ .setStartOffset(21)
+ .setDeliveryCompleteCount(10)
+ .setStateEpoch(1)))
+ )
+ );
+
+ ReadShareGroupStateSummaryParameters
readShareGroupStateSummaryParameters =
ReadShareGroupStateSummaryParameters.from(readShareGroupStateSummaryRequestData);
+ ReadShareGroupStateSummaryResult readShareGroupStateSummaryResult =
ReadShareGroupStateSummaryResult.from(readShareGroupStateSummaryResponseData);
+ when(persister.readSummary(
+ ArgumentMatchers.eq(readShareGroupStateSummaryParameters)
+
)).thenReturn(CompletableFuture.completedFuture(readShareGroupStateSummaryResult));
+
+
CompletableFuture<DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup>
future =
+
service.describeShareGroupAllOffsets(requestContext(ApiKeys.DESCRIBE_SHARE_GROUP_OFFSETS),
requestData);
+
+ assertEquals(responseData, future.get());
+ }
+
@Test
public void testDescribeShareGroupAllOffsetsCoordinatorNotActive() throws
ExecutionException, InterruptedException {
CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime =
mockRuntime();
@@ -5650,6 +5966,7 @@ public class GroupCoordinatorServiceTest {
private GroupCoordinatorMetrics metrics = new
GroupCoordinatorMetrics();
private Persister persister = new NoOpStatePersister();
private CoordinatorMetadataImage metadataImage = null;
+ private PartitionMetadataClient partitionMetadataClient = null;
GroupCoordinatorService build() {
return build(false);
@@ -5667,7 +5984,8 @@ public class GroupCoordinatorServiceTest {
metrics,
configManager,
persister,
- new MockTimer()
+ new MockTimer(),
+ partitionMetadataClient
);
if (serviceStartup) {
@@ -5703,6 +6021,11 @@ public class GroupCoordinatorServiceTest {
this.metrics = metrics;
return this;
}
+
+ public GroupCoordinatorServiceBuilder
setPartitionMetadataClient(PartitionMetadataClient partitionMetadataClient) {
+ this.partitionMetadataClient = partitionMetadataClient;
+ return this;
+ }
}
private static DeleteShareGroupStateParameters
createDeleteShareRequest(String groupId, Uuid topic, List<Integer> partitions) {
diff --git
a/server-common/src/main/java/org/apache/kafka/server/share/persister/PartitionFactory.java
b/server-common/src/main/java/org/apache/kafka/server/share/persister/PartitionFactory.java
index ed998bb0b1f..215e95ff085 100644
---
a/server-common/src/main/java/org/apache/kafka/server/share/persister/PartitionFactory.java
+++
b/server-common/src/main/java/org/apache/kafka/server/share/persister/PartitionFactory.java
@@ -28,6 +28,7 @@ public class PartitionFactory {
public static final int DEFAULT_STATE_EPOCH = 0;
public static final int UNINITIALIZED_START_OFFSET = -1;
public static final int UNINITIALIZED_DELIVERY_COMPLETE_COUNT = -1;
+ public static final long UNINITIALIZED_LAG = -1;
public static final short DEFAULT_ERROR_CODE = Errors.NONE.code();
public static final int DEFAULT_LEADER_EPOCH = 0;
public static final String DEFAULT_ERR_MESSAGE = Errors.NONE.message();