This is an automated email from the ASF dual-hosted git repository.

schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 1146f97770c KAFKA-19800: Compute share partition lag in 
GroupCoordinatorService (#20839)
1146f97770c is described below

commit 1146f97770c2c25ccf189563c419325f23dde3ec
Author: Chirag Wadhwa <[email protected]>
AuthorDate: Tue Nov 11 20:44:58 2025 +0530

    KAFKA-19800: Compute share partition lag in GroupCoordinatorService (#20839)
    
    This PR is part of
    
    
[KIP-1226](https://cwiki.apache.org/confluence/display/KAFKA/KIP-1226%3A+Introducing+Share+Partition+Lag+Persistence+and+Retrieval).
    
    This PR computes the share partition lag in GroupCoordinatorService
    using `deliveryCompleteCount` received from `readSummary`, and partition
    end offsets received from `adminClient.listOffstes`. The computed lag is
    returned to the end user in DescribeShareGroupOffsetsResponse.
    
    NOTE: The GroupCoordinator is built with a no-op implementation of
    PartitionMetadataClient, which returns -1 as the partition end offset
    for any requested topic partition. This will later be replaced with an
    actual implementation that uses InterBrokerSendThread to retrieve
    partition end offsets via ListOffsets RPC.
    
    Reviewers: Apoorv Mittal <[email protected]>, Andrew Schofield
     <[email protected]>
---
 .../internals/ListShareGroupOffsetsHandler.java    |   3 +-
 .../message/DescribeShareGroupOffsetsRequest.json  |   5 +-
 .../message/DescribeShareGroupOffsetsResponse.json |   7 +-
 .../kafka/common/requests/RequestResponseTest.java |   1 +
 .../src/main/scala/kafka/server/BrokerServer.scala |  26 +-
 core/src/main/scala/kafka/server/KafkaApis.scala   |   1 +
 .../scala/unit/kafka/server/KafkaApisTest.scala    |  30 ++
 .../coordinator/group/GroupCoordinatorService.java | 148 +++++++--
 .../coordinator/group/PartitionMetadataClient.java |  38 +++
 .../group/GroupCoordinatorServiceTest.java         | 331 ++++++++++++++++++++-
 .../server/share/persister/PartitionFactory.java   |   1 +
 11 files changed, 565 insertions(+), 26 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/internals/ListShareGroupOffsetsHandler.java
 
b/clients/src/main/java/org/apache/kafka/clients/admin/internals/ListShareGroupOffsetsHandler.java
index 5b75a482aae..2d033510d7f 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/admin/internals/ListShareGroupOffsetsHandler.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/admin/internals/ListShareGroupOffsetsHandler.java
@@ -133,11 +133,12 @@ public class ListShareGroupOffsetsHandler extends 
AdminApiHandler.Batched<Coordi
                             if (partitionResponse.errorCode() == 
Errors.NONE.code()) {
                                 final long startOffset = 
partitionResponse.startOffset();
                                 final Optional<Integer> leaderEpoch = 
partitionResponse.leaderEpoch() < 0 ? Optional.empty() : 
Optional.of(partitionResponse.leaderEpoch());
+                                final Optional<Long> lag = 
partitionResponse.lag() < 0 ? Optional.empty() : 
Optional.of(partitionResponse.lag());
                                 // Negative offset indicates there is no start 
offset for this partition
                                 if (partitionResponse.startOffset() < 0) {
                                     groupOffsetsListing.put(tp, null);
                                 } else {
-                                    groupOffsetsListing.put(tp, new 
SharePartitionOffsetInfo(startOffset, leaderEpoch, Optional.empty()));
+                                    groupOffsetsListing.put(tp, new 
SharePartitionOffsetInfo(startOffset, leaderEpoch, lag));
                                 }
                             } else {
                                 log.warn("Skipping return offset for {} due to 
error {}: {}.", tp, partitionResponse.errorCode(), 
partitionResponse.errorMessage());
diff --git 
a/clients/src/main/resources/common/message/DescribeShareGroupOffsetsRequest.json
 
b/clients/src/main/resources/common/message/DescribeShareGroupOffsetsRequest.json
index f87c1fc394c..c1a9544a82a 100644
--- 
a/clients/src/main/resources/common/message/DescribeShareGroupOffsetsRequest.json
+++ 
b/clients/src/main/resources/common/message/DescribeShareGroupOffsetsRequest.json
@@ -18,7 +18,10 @@
   "type": "request",
   "listeners": ["broker"],
   "name": "DescribeShareGroupOffsetsRequest",
-  "validVersions": "0",
+  // Version 0 is the initial version (KIP-932).
+  //
+  // Version 1 introduces Lag in the response (KIP-1226).
+  "validVersions": "0-1",
   "flexibleVersions": "0+",
   "fields": [
     { "name": "Groups", "type": "[]DescribeShareGroupOffsetsRequestGroup", 
"versions": "0+",
diff --git 
a/clients/src/main/resources/common/message/DescribeShareGroupOffsetsResponse.json
 
b/clients/src/main/resources/common/message/DescribeShareGroupOffsetsResponse.json
index 692a2657424..f036de04e1a 100644
--- 
a/clients/src/main/resources/common/message/DescribeShareGroupOffsetsResponse.json
+++ 
b/clients/src/main/resources/common/message/DescribeShareGroupOffsetsResponse.json
@@ -17,7 +17,10 @@
   "apiKey": 90,
   "type": "response",
   "name": "DescribeShareGroupOffsetsResponse",
-  "validVersions": "0",
+  // Version 0 is the initial version (KIP-932).
+  //
+  // Version 1 introduces Lag (KIP-1226).
+  "validVersions": "0-1",
   "flexibleVersions": "0+",
   // Supported errors:
   // - GROUP_AUTHORIZATION_FAILED (version 0+)
@@ -48,6 +51,8 @@
             "about": "The share-partition start offset." },
           { "name": "LeaderEpoch", "type": "int32", "versions": "0+",
             "about": "The leader epoch of the partition." },
+          { "name": "Lag", "type": "int64", "versions": "1+", "ignorable": 
"true", "default": -1,
+            "about": "The share-partition lag." },
           { "name": "ErrorCode", "type": "int16", "versions": "0+",
             "about": "The partition-level error code, or 0 if there was no 
error." },
           { "name": "ErrorMessage", "type": "string", "versions": "0+", 
"nullableVersions": "0+", "default": "null",
diff --git 
a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
 
b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index 596828a0839..7834db2e5fa 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -3841,6 +3841,7 @@ public class RequestResponseTest {
                     .setPartitionIndex(0)
                     .setErrorCode(Errors.NONE.code())
                     .setStartOffset(0)
+                    .setLag(0)
                     .setLeaderEpoch(0)))))));
         return new DescribeShareGroupOffsetsResponse(data);
     }
diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala 
b/core/src/main/scala/kafka/server/BrokerServer.scala
index 25a46d4f37d..5772a2879bc 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -36,7 +36,7 @@ import org.apache.kafka.common.utils.{LogContext, Time, Utils}
 import org.apache.kafka.common.{ClusterResource, TopicPartition, Uuid}
 import org.apache.kafka.coordinator.common.runtime.{CoordinatorLoaderImpl, 
CoordinatorRecord}
 import org.apache.kafka.coordinator.group.metrics.{GroupCoordinatorMetrics, 
GroupCoordinatorRuntimeMetrics}
-import org.apache.kafka.coordinator.group.{GroupConfigManager, 
GroupCoordinator, GroupCoordinatorRecordSerde, GroupCoordinatorService}
+import org.apache.kafka.coordinator.group.{GroupConfigManager, 
GroupCoordinator, GroupCoordinatorRecordSerde, GroupCoordinatorService, 
PartitionMetadataClient}
 import org.apache.kafka.coordinator.share.metrics.{ShareCoordinatorMetrics, 
ShareCoordinatorRuntimeMetrics}
 import org.apache.kafka.coordinator.share.{ShareCoordinator, 
ShareCoordinatorRecordSerde, ShareCoordinatorService}
 import org.apache.kafka.coordinator.transaction.ProducerIdManager
@@ -119,6 +119,8 @@ class BrokerServer(
   var credentialProvider: CredentialProvider = _
   var tokenCache: DelegationTokenCache = _
 
+  var partitionMetadataClient: PartitionMetadataClient = _
+
   @volatile var groupCoordinator: GroupCoordinator = _
 
   var groupConfigManager: GroupConfigManager = _
@@ -371,6 +373,8 @@ class BrokerServer(
       /* create persister */
       persister = createShareStatePersister()
 
+      partitionMetadataClient = createPartitionMetadataClient()
+
       groupCoordinator = createGroupCoordinator()
 
       val producerIdManagerSupplier = () => ProducerIdManager.rpc(
@@ -620,6 +624,25 @@ class BrokerServer(
     }
   }
 
+  private def createPartitionMetadataClient(): PartitionMetadataClient = {
+    // This is a no-op implementation of PartitionMetadataClient. It always 
returns -1 as the latest offset for any
+    // requested topic partition.
+    // TODO: KAFKA-19800: Implement a real PartitionMetadataClient that can 
fetch latest offsets via InterBrokerSendThread.
+    new PartitionMetadataClient {
+      override def listLatestOffsets(topicPartitions: util.Set[TopicPartition]
+                                    ): util.Map[TopicPartition, 
util.concurrent.CompletableFuture[java.lang.Long]] = {
+        topicPartitions.asScala
+          .map { tp =>
+            tp -> 
CompletableFuture.completedFuture(java.lang.Long.valueOf(-1L))
+          }
+          .toMap
+          .asJava
+      }
+
+      override def close(): Unit = {}
+    }
+  }
+
   private def createGroupCoordinator(): GroupCoordinator = {
     // Create group coordinator, but don't start it until we've started 
replica manager.
     // Hardcode Time.SYSTEM for now as some Streams tests fail otherwise, it 
would be good
@@ -651,6 +674,7 @@ class BrokerServer(
       .withGroupConfigManager(groupConfigManager)
       .withPersister(persister)
       .withAuthorizerPlugin(authorizerPlugin.toJava)
+      .withPartitionMetadataClient(partitionMetadataClient)
       .build()
   }
 
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala 
b/core/src/main/scala/kafka/server/KafkaApis.scala
index ea080147b53..c31c404eef9 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -3820,6 +3820,7 @@ class KafkaApis(val requestChannel: RequestChannel,
             topicResponse.partitions.add(new 
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition()
               .setPartitionIndex(partitionIndex)
               .setStartOffset(-1)
+              .setLag(-1)
               .setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code)
               .setErrorMessage(Errors.TOPIC_AUTHORIZATION_FAILED.message))
           }
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala 
b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index d3c11408465..96ea2ae8858 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -12717,18 +12717,21 @@ class KafkaApisTest extends Logging {
             new DescribeShareGroupOffsetsResponsePartition()
               .setPartitionIndex(1)
               .setStartOffset(0)
+              .setLag(0)
               .setLeaderEpoch(1)
               .setErrorMessage(null)
               .setErrorCode(0),
             new DescribeShareGroupOffsetsResponsePartition()
               .setPartitionIndex(2)
               .setStartOffset(0)
+              .setLag(0)
               .setLeaderEpoch(1)
               .setErrorMessage(null)
               .setErrorCode(0),
             new DescribeShareGroupOffsetsResponsePartition()
               .setPartitionIndex(3)
               .setStartOffset(0)
+              .setLag(0)
               .setLeaderEpoch(1)
               .setErrorMessage(null)
               .setErrorCode(0)
@@ -12740,12 +12743,14 @@ class KafkaApisTest extends Logging {
             new DescribeShareGroupOffsetsResponsePartition()
               .setPartitionIndex(10)
               .setStartOffset(0)
+              .setLag(0)
               .setLeaderEpoch(1)
               .setErrorMessage(null)
               .setErrorCode(0),
             new DescribeShareGroupOffsetsResponsePartition()
               .setPartitionIndex(20)
               .setStartOffset(0)
+              .setLag(0)
               .setLeaderEpoch(1)
               .setErrorMessage(null)
               .setErrorCode(0)
@@ -12762,6 +12767,7 @@ class KafkaApisTest extends Logging {
             new DescribeShareGroupOffsetsResponsePartition()
               .setPartitionIndex(0)
               .setStartOffset(0)
+              .setLag(0)
               .setLeaderEpoch(1)
               .setErrorMessage(null)
               .setErrorCode(0)
@@ -12861,18 +12867,21 @@ class KafkaApisTest extends Logging {
             new DescribeShareGroupOffsetsResponsePartition()
               .setPartitionIndex(1)
               .setStartOffset(0)
+              .setLag(0)
               .setLeaderEpoch(1)
               .setErrorMessage(null)
               .setErrorCode(0),
             new DescribeShareGroupOffsetsResponsePartition()
               .setPartitionIndex(2)
               .setStartOffset(0)
+              .setLag(0)
               .setLeaderEpoch(1)
               .setErrorMessage(null)
               .setErrorCode(0),
             new DescribeShareGroupOffsetsResponsePartition()
               .setPartitionIndex(3)
               .setStartOffset(0)
+              .setLag(0)
               .setLeaderEpoch(1)
               .setErrorMessage(null)
               .setErrorCode(0)
@@ -12884,12 +12893,14 @@ class KafkaApisTest extends Logging {
             new DescribeShareGroupOffsetsResponsePartition()
               .setPartitionIndex(10)
               .setStartOffset(-1)
+              .setLag(-1)
               .setLeaderEpoch(0)
               .setErrorMessage(Errors.TOPIC_AUTHORIZATION_FAILED.message)
               .setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code),
             new DescribeShareGroupOffsetsResponsePartition()
               .setPartitionIndex(20)
               .setStartOffset(-1)
+              .setLag(-1)
               .setLeaderEpoch(0)
               .setErrorMessage(Errors.TOPIC_AUTHORIZATION_FAILED.message)
               .setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code)
@@ -12906,6 +12917,7 @@ class KafkaApisTest extends Logging {
             new DescribeShareGroupOffsetsResponsePartition()
               .setPartitionIndex(0)
               .setStartOffset(-1)
+              .setLag(-1)
               .setLeaderEpoch(0)
               .setErrorMessage(Errors.TOPIC_AUTHORIZATION_FAILED.message)
               .setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code)
@@ -12926,18 +12938,21 @@ class KafkaApisTest extends Logging {
             new DescribeShareGroupOffsetsResponsePartition()
               .setPartitionIndex(1)
               .setStartOffset(0)
+              .setLag(0)
               .setLeaderEpoch(1)
               .setErrorMessage(null)
               .setErrorCode(0),
             new DescribeShareGroupOffsetsResponsePartition()
               .setPartitionIndex(2)
               .setStartOffset(0)
+              .setLag(0)
               .setLeaderEpoch(1)
               .setErrorMessage(null)
               .setErrorCode(0),
             new DescribeShareGroupOffsetsResponsePartition()
               .setPartitionIndex(3)
               .setStartOffset(0)
+              .setLag(0)
               .setLeaderEpoch(1)
               .setErrorMessage(null)
               .setErrorCode(0)
@@ -13022,18 +13037,21 @@ class KafkaApisTest extends Logging {
             new DescribeShareGroupOffsetsResponsePartition()
               .setPartitionIndex(1)
               .setStartOffset(0)
+              .setLag(0)
               .setLeaderEpoch(1)
               .setErrorMessage(null)
               .setErrorCode(0),
             new DescribeShareGroupOffsetsResponsePartition()
               .setPartitionIndex(2)
               .setStartOffset(0)
+              .setLag(0)
               .setLeaderEpoch(1)
               .setErrorMessage(null)
               .setErrorCode(0),
             new DescribeShareGroupOffsetsResponsePartition()
               .setPartitionIndex(3)
               .setStartOffset(0)
+              .setLag(0)
               .setLeaderEpoch(1)
               .setErrorMessage(null)
               .setErrorCode(0)
@@ -13055,18 +13073,21 @@ class KafkaApisTest extends Logging {
             new DescribeShareGroupOffsetsResponsePartition()
               .setPartitionIndex(1)
               .setStartOffset(0)
+              .setLag(0)
               .setLeaderEpoch(1)
               .setErrorMessage(null)
               .setErrorCode(0),
             new DescribeShareGroupOffsetsResponsePartition()
               .setPartitionIndex(2)
               .setStartOffset(0)
+              .setLag(0)
               .setLeaderEpoch(1)
               .setErrorMessage(null)
               .setErrorCode(0),
             new DescribeShareGroupOffsetsResponsePartition()
               .setPartitionIndex(3)
               .setStartOffset(0)
+              .setLag(0)
               .setLeaderEpoch(1)
               .setErrorMessage(null)
               .setErrorCode(0)
@@ -13078,12 +13099,14 @@ class KafkaApisTest extends Logging {
             new DescribeShareGroupOffsetsResponsePartition()
               .setPartitionIndex(10)
               .setStartOffset(0)
+              .setLag(0)
               .setLeaderEpoch(1)
               .setErrorMessage(null)
               .setErrorCode(0),
             new DescribeShareGroupOffsetsResponsePartition()
               .setPartitionIndex(20)
               .setStartOffset(0)
+              .setLag(0)
               .setLeaderEpoch(1)
               .setErrorMessage(null)
               .setErrorCode(0)
@@ -13100,6 +13123,7 @@ class KafkaApisTest extends Logging {
             new DescribeShareGroupOffsetsResponsePartition()
               .setPartitionIndex(0)
               .setStartOffset(0)
+              .setLag(0)
               .setLeaderEpoch(1)
               .setErrorMessage(null)
               .setErrorCode(0)
@@ -13160,18 +13184,21 @@ class KafkaApisTest extends Logging {
             new DescribeShareGroupOffsetsResponsePartition()
               .setPartitionIndex(1)
               .setStartOffset(0)
+              .setLag(0)
               .setLeaderEpoch(1)
               .setErrorMessage(null)
               .setErrorCode(0),
             new DescribeShareGroupOffsetsResponsePartition()
               .setPartitionIndex(2)
               .setStartOffset(0)
+              .setLag(0)
               .setLeaderEpoch(1)
               .setErrorMessage(null)
               .setErrorCode(0),
             new DescribeShareGroupOffsetsResponsePartition()
               .setPartitionIndex(3)
               .setStartOffset(0)
+              .setLag(0)
               .setLeaderEpoch(1)
               .setErrorMessage(null)
               .setErrorCode(0)
@@ -13183,12 +13210,14 @@ class KafkaApisTest extends Logging {
             new DescribeShareGroupOffsetsResponsePartition()
               .setPartitionIndex(10)
               .setStartOffset(0)
+              .setLag(0)
               .setLeaderEpoch(1)
               .setErrorMessage(null)
               .setErrorCode(0),
             new DescribeShareGroupOffsetsResponsePartition()
               .setPartitionIndex(20)
               .setStartOffset(0)
+              .setLag(0)
               .setLeaderEpoch(1)
               .setErrorMessage(null)
               .setErrorCode(0)
@@ -13205,6 +13234,7 @@ class KafkaApisTest extends Logging {
             new DescribeShareGroupOffsetsResponsePartition()
               .setPartitionIndex(0)
               .setStartOffset(0)
+              .setLag(0)
               .setLeaderEpoch(1)
               .setErrorMessage(null)
               .setErrorCode(0)
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
index 9c300c529b5..3ffb2d3e644 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
@@ -107,8 +107,10 @@ import 
org.apache.kafka.server.share.persister.InitializeShareGroupStateResult;
 import org.apache.kafka.server.share.persister.PartitionErrorData;
 import org.apache.kafka.server.share.persister.PartitionFactory;
 import org.apache.kafka.server.share.persister.PartitionStateData;
+import org.apache.kafka.server.share.persister.PartitionStateSummaryData;
 import org.apache.kafka.server.share.persister.Persister;
 import 
org.apache.kafka.server.share.persister.ReadShareGroupStateSummaryParameters;
+import 
org.apache.kafka.server.share.persister.ReadShareGroupStateSummaryResult;
 import org.apache.kafka.server.share.persister.TopicData;
 import org.apache.kafka.server.util.FutureUtils;
 import org.apache.kafka.server.util.timer.Timer;
@@ -163,6 +165,7 @@ public class GroupCoordinatorService implements 
GroupCoordinator {
         private GroupConfigManager groupConfigManager;
         private Persister persister;
         private Optional<Plugin<Authorizer>> authorizerPlugin;
+        private PartitionMetadataClient partitionMetadataClient;
 
         public Builder(
             int nodeId,
@@ -217,6 +220,11 @@ public class GroupCoordinatorService implements 
GroupCoordinator {
             return this;
         }
 
+        public Builder withPartitionMetadataClient(PartitionMetadataClient 
partitionMetadataClient) {
+            this.partitionMetadataClient = partitionMetadataClient;
+            return this;
+        }
+
         public GroupCoordinatorService build() {
             requireNonNull(config, "Config must be set.");
             requireNonNull(writer, "Writer must be set.");
@@ -228,6 +236,7 @@ public class GroupCoordinatorService implements 
GroupCoordinator {
             requireNonNull(groupConfigManager, "GroupConfigManager must be 
set.");
             requireNonNull(persister, "Persister must be set.");
             requireNonNull(authorizerPlugin, "Authorizer must be set.");
+            requireNonNull(partitionMetadataClient, "PartitionMetadataClient 
must be set.");
 
             String logPrefix = String.format("GroupCoordinator id=%d", nodeId);
             LogContext logContext = new LogContext(String.format("[%s] ", 
logPrefix));
@@ -270,7 +279,8 @@ public class GroupCoordinatorService implements 
GroupCoordinator {
                 groupCoordinatorMetrics,
                 groupConfigManager,
                 persister,
-                timer
+                timer,
+                partitionMetadataClient
             );
         }
     }
@@ -320,6 +330,11 @@ public class GroupCoordinatorService implements 
GroupCoordinator {
      */
     private final Set<String> consumerGroupAssignors;
 
+    /**
+     * The client used for getting partition end offsets
+     */
+    private final PartitionMetadataClient partitionMetadataClient;
+
     /**
      * The number of partitions of the __consumer_offsets topics. This is 
provided
      * when the component is started.
@@ -349,7 +364,8 @@ public class GroupCoordinatorService implements 
GroupCoordinator {
         GroupCoordinatorMetrics groupCoordinatorMetrics,
         GroupConfigManager groupConfigManager,
         Persister persister,
-        Timer timer
+        Timer timer,
+        PartitionMetadataClient partitionMetadataClient
     ) {
         this.log = logContext.logger(GroupCoordinatorService.class);
         this.config = config;
@@ -363,6 +379,7 @@ public class GroupCoordinatorService implements 
GroupCoordinator {
             .stream()
             .map(ConsumerGroupPartitionAssignor::name)
             .collect(Collectors.toSet());
+        this.partitionMetadataClient = partitionMetadataClient;
     }
 
     /**
@@ -1835,27 +1852,122 @@ public class GroupCoordinatorService implements 
GroupCoordinator {
                     return;
                 }
 
-                // Return -1 (uninitialized offset) for the situation where 
the persister returned an error.
-                // This is consistent with OffsetFetch for situations in which 
there is no offset information to fetch.
-                // It's treated as absence of data, rather than an error.
-                result.topicsData().forEach(topicData ->
-                    describeShareGroupOffsetsResponseTopicList.add(new 
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic()
-                        .setTopicId(topicData.topicId())
-                        
.setTopicName(requestTopicIdToNameMapping.get(topicData.topicId()))
-                        .setPartitions(topicData.partitions().stream().map(
-                            partitionData -> new 
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition()
+                // Now compute lag for each partition and build the final 
response.
+                computeShareGroupLagAndBuildResponse(
+                    result,
+                    requestTopicIdToNameMapping,
+                    describeShareGroupOffsetsResponseTopicList,
+                    future,
+                    readSummaryRequestData.groupId()
+                );
+            });
+        return future;
+    }
+
+    private void computeShareGroupLagAndBuildResponse(
+        ReadShareGroupStateSummaryResult readSummaryResult,
+        Map<Uuid, String> requestTopicIdToNameMapping,
+        
List<DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic>
 describeShareGroupOffsetsResponseTopicList,
+        
CompletableFuture<DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup>
 responseFuture,
+        String groupId
+    ) {
+        // This set keeps track of the partitions for which lag computation is 
needed.
+        Set<TopicPartition> partitionsToComputeLag = new HashSet<>();
+
+        readSummaryResult.topicsData().forEach(topicData -> {
+            topicData.partitions().forEach(partitionData -> {
+                if (shouldComputeSharePartitionLag(partitionData)) {
+                    // If the readSummaryResult is successful for a partition, 
we need to compute lag.
+                    partitionsToComputeLag.add(new 
TopicPartition(requestTopicIdToNameMapping.get(topicData.topicId()), 
partitionData.partition()));
+                }
+            });
+        });
+
+        // Fetch latest offsets for all partitions that need lag computation.
+        Map<TopicPartition, CompletableFuture<Long>> partitionLatestOffsets = 
partitionsToComputeLag.isEmpty() ? Map.of() :
+                
partitionMetadataClient.listLatestOffsets(partitionsToComputeLag);
+
+        // Final response object to be built. It will include lag information 
computed from partitionMetadataClient.
+        
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup 
responseGroup =
+            new 
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup()
+                .setGroupId(groupId);
+
+        // List of response topics to be set in the response group.
+        
List<DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic>
 responseTopics = new ArrayList<>();
+
+        CompletableFuture.allOf(partitionLatestOffsets.values().toArray(new 
CompletableFuture<?>[0]))
+            .whenComplete((result, error) -> {
+                // The error variable will not be null when one or more of the 
partitionLatestOffsets futures get completed exceptionally.
+                // If that is the case, then the same exception would be 
caught in the try catch executed below when .join() is called.
+                // Thus, we do not need to check error != null here.
+                readSummaryResult.topicsData().forEach(topicData -> {
+                    // Build response for each topic.
+                    
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic 
topic =
+                        new 
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic()
+                            .setTopicId(topicData.topicId())
+                            
.setTopicName(requestTopicIdToNameMapping.get(topicData.topicId()));
+
+                    // Build response for each partition within the topic.
+                    
List<DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition>
 partitionResponses = new ArrayList<>();
+
+                    topicData.partitions().forEach(partitionData -> {
+                        TopicPartition tp = new 
TopicPartition(requestTopicIdToNameMapping.get(topicData.topicId()), 
partitionData.partition());
+                        // For the partitions where lag computation is not 
needed, a partitionResponse is built directly.
+                        // The lag is set to -1 (uninitialized lag) in these 
cases. If the persister returned an error for a
+                        // partition, the startOffset is set to -1 
(uninitialized offset) and the leaderEpoch is set to 0
+                        // (default epoch). This is consistent with 
OffsetFetch for situations in which there is no offset
+                        // information to fetch. It's treated as absence of 
data, rather than an error
+                        if (!shouldComputeSharePartitionLag(partitionData)) {
+                            partitionResponses.add(new 
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition()
                                 .setPartitionIndex(partitionData.partition())
                                 .setStartOffset(partitionData.errorCode() == 
Errors.NONE.code() ? partitionData.startOffset() : 
PartitionFactory.UNINITIALIZED_START_OFFSET)
                                 .setLeaderEpoch(partitionData.errorCode() == 
Errors.NONE.code() ? partitionData.leaderEpoch() : 
PartitionFactory.DEFAULT_LEADER_EPOCH)
-                        ).toList())
-                    ));
+                                .setLag(PartitionFactory.UNINITIALIZED_LAG));
+                        } else {
+                            try {
+                                // This code is reached when allOf above is 
complete, which happens when all the
+                                // individual futures are complete. Thus, the 
call to join() here is safe.
+                                long partitionLatestOffset = 
partitionLatestOffsets.get(tp).join();
+                                // Compute lag as (partition end offset - 
startOffset - deliveryCompleteCount).
+                                // Note, partition end offset, which is 
retrieved from partitionMetadataClient, is the offset of
+                                // the next message to be produced, not the 
last message offset. Thus, the formula for lag computation
+                                // does not need a +1 adjustment.
+                                long lag = partitionLatestOffset - 
partitionData.startOffset() - partitionData.deliveryCompleteCount();
+                                partitionResponses.add(new 
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition()
+                                    
.setPartitionIndex(partitionData.partition())
+                                    
.setStartOffset(partitionData.startOffset())
+                                    
.setLeaderEpoch(partitionData.leaderEpoch())
+                                    .setLag(lag));
+                            } catch (CompletionException e) {
+                                // If fetching latest offset for a partition 
failed, return the error in the response for that partition.
+                                partitionResponses.add(new 
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition()
+                                    
.setPartitionIndex(partitionData.partition())
+                                    
.setErrorCode(Errors.forException(e.getCause()).code())
+                                    
.setErrorMessage(e.getCause().getMessage()));
+                            }
+                        }
+                    });
+                    topic.setPartitions(partitionResponses);
+                    responseTopics.add(topic);
+                });
 
-                future.complete(
-                    new 
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup()
-                        .setGroupId(readSummaryRequestData.groupId())
-                        
.setTopics(describeShareGroupOffsetsResponseTopicList));
+                // Add topics which did not exist in the metadata image and 
were handled earlier.
+                
responseTopics.addAll(describeShareGroupOffsetsResponseTopicList);
+                // Set topics in the response group.
+                responseGroup.setTopics(responseTopics);
+                // Complete the future with the built response.
+                responseFuture.complete(responseGroup);
             });
-        return future;
+    }
+
+    private boolean shouldComputeSharePartitionLag(PartitionStateSummaryData 
partitionData) {
+        // The share partition lag would be computed for a share partition ony 
if -
+        // 1. The read summary result for the partition is successful.
+        // 3. The start offset is initialized.
+        // 4. The delivery complete count is initialized.
+        return partitionData.errorCode() == Errors.NONE.code() &&
+            partitionData.startOffset() != 
PartitionFactory.UNINITIALIZED_START_OFFSET &&
+            partitionData.deliveryCompleteCount() != 
PartitionFactory.UNINITIALIZED_DELIVERY_COMPLETE_COUNT;
     }
 
     /**
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/PartitionMetadataClient.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/PartitionMetadataClient.java
new file mode 100644
index 00000000000..ea6d940143f
--- /dev/null
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/PartitionMetadataClient.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.TopicPartition;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Client interface for retrieving latest offsets for topic partitions.
+ */
+public interface PartitionMetadataClient extends AutoCloseable {
+    /**
+     * Lists the latest offsets for the provided topic partitions.
+     *
+     * @param topicPartitions A set of topic partitions.
+     * @return A map of topic partitions to the completableFuture of their 
latest offsets
+     */
+    Map<TopicPartition, CompletableFuture<Long>> listLatestOffsets(
+        Set<TopicPartition> topicPartitions
+    );
+}
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
index 01832467b0f..1a4efc6d989 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
@@ -128,6 +128,7 @@ import org.mockito.ArgumentMatchers;
 import java.net.InetAddress;
 import java.time.Duration;
 import java.util.Arrays;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -3714,7 +3715,8 @@ public class GroupCoordinatorServiceTest {
                     .setTopicId(TOPIC_ID)
                     .setPartitions(List.of(new 
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition()
                         .setPartitionIndex(partition)
-                        
.setStartOffset(PartitionFactory.UNINITIALIZED_START_OFFSET))))
+                        
.setStartOffset(PartitionFactory.UNINITIALIZED_START_OFFSET)
+                        .setLag(PartitionFactory.UNINITIALIZED_LAG))))
             );
 
         
CompletableFuture<DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup>
 future =
@@ -3727,13 +3729,21 @@ public class GroupCoordinatorServiceTest {
     public void testDescribeShareGroupOffsetsWithDefaultPersister() throws 
InterruptedException, ExecutionException {
         CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = 
mockRuntime();
         Persister persister = mock(DefaultStatePersister.class);
+
+        PartitionMetadataClient partitionMetadataClient = 
mock(PartitionMetadataClient.class);
+
         GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
             .setConfig(createConfig())
             .setRuntime(runtime)
             .setPersister(persister)
+            .setPartitionMetadataClient(partitionMetadataClient)
             .build(true);
         service.startup(() -> 1);
 
+        Set<TopicPartition> partitionsToComputeLag = new HashSet<>(Set.of(new 
TopicPartition(TOPIC_NAME, 1)));
+        when(partitionMetadataClient.listLatestOffsets(partitionsToComputeLag))
+            .thenReturn(Map.of(new TopicPartition(TOPIC_NAME, 1), 
CompletableFuture.completedFuture(41L)));
+
         int partition = 1;
         
DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestGroup 
requestData = new 
DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestGroup()
             .setGroupId("share-group-id")
@@ -3757,7 +3767,8 @@ public class GroupCoordinatorServiceTest {
                     .setTopicId(TOPIC_ID)
                     .setPartitions(List.of(new 
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition()
                         .setPartitionIndex(partition)
-                        .setStartOffset(21))))
+                        .setStartOffset(21)
+                        .setLag(10L))))
             );
 
         ReadShareGroupStateSummaryResponseData 
readShareGroupStateSummaryResponseData = new 
ReadShareGroupStateSummaryResponseData()
@@ -3767,6 +3778,7 @@ public class GroupCoordinatorServiceTest {
                     .setPartitions(List.of(new 
ReadShareGroupStateSummaryResponseData.PartitionResult()
                         .setPartition(partition)
                         .setStartOffset(21)
+                        .setDeliveryCompleteCount(10)
                         .setStateEpoch(1)))
                 )
             );
@@ -3903,6 +3915,144 @@ public class GroupCoordinatorServiceTest {
         assertFutureThrows(IllegalStateException.class, future, "Result is 
null for the read state summary");
     }
 
+    @Test
+    public void 
testDescribeShareGroupOffsetsWithDefaultPersisterReadSummaryPartitionError() 
throws InterruptedException, ExecutionException {
+        CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = 
mockRuntime();
+        Persister persister = mock(DefaultStatePersister.class);
+
+        GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
+            .setConfig(createConfig())
+            .setRuntime(runtime)
+            .setPersister(persister)
+            .build(true);
+        service.startup(() -> 1);
+
+        int partition = 1;
+        
DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestGroup 
requestData = new 
DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestGroup()
+            .setGroupId("share-group-id")
+            .setTopics(List.of(new 
DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestTopic()
+                .setTopicName(TOPIC_NAME)
+                .setPartitions(List.of(partition))
+            ));
+
+        ReadShareGroupStateSummaryRequestData 
readShareGroupStateSummaryRequestData = new 
ReadShareGroupStateSummaryRequestData()
+            .setGroupId("share-group-id")
+            .setTopics(List.of(new 
ReadShareGroupStateSummaryRequestData.ReadStateSummaryData()
+                .setTopicId(TOPIC_ID)
+                .setPartitions(List.of(new 
ReadShareGroupStateSummaryRequestData.PartitionData()
+                    .setPartition(partition)))));
+
+        
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup 
responseData = new 
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup()
+            .setGroupId("share-group-id")
+            .setTopics(
+                List.of(new 
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic()
+                    .setTopicName(TOPIC_NAME)
+                    .setTopicId(TOPIC_ID)
+                    .setPartitions(List.of(new 
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition()
+                        .setPartitionIndex(partition)
+                        
.setStartOffset(PartitionFactory.UNINITIALIZED_START_OFFSET)
+                        .setLeaderEpoch(PartitionFactory.DEFAULT_LEADER_EPOCH)
+                        .setLag(PartitionFactory.UNINITIALIZED_LAG))))
+            );
+
+        ReadShareGroupStateSummaryResponseData 
readShareGroupStateSummaryResponseData = new 
ReadShareGroupStateSummaryResponseData()
+            .setResults(
+                List.of(new 
ReadShareGroupStateSummaryResponseData.ReadStateSummaryResult()
+                    .setTopicId(TOPIC_ID)
+                    .setPartitions(List.of(new 
ReadShareGroupStateSummaryResponseData.PartitionResult()
+                        .setPartition(partition)
+                        .setErrorCode(Errors.UNKNOWN_SERVER_ERROR.code())
+                        .setErrorMessage(Errors.UNKNOWN_SERVER_ERROR.message())
+                    ))
+                )
+            );
+
+        ReadShareGroupStateSummaryParameters 
readShareGroupStateSummaryParameters = 
ReadShareGroupStateSummaryParameters.from(readShareGroupStateSummaryRequestData);
+        ReadShareGroupStateSummaryResult readShareGroupStateSummaryResult = 
ReadShareGroupStateSummaryResult.from(readShareGroupStateSummaryResponseData);
+        when(persister.readSummary(
+            ArgumentMatchers.eq(readShareGroupStateSummaryParameters)
+        
)).thenReturn(CompletableFuture.completedFuture(readShareGroupStateSummaryResult));
+
+        
CompletableFuture<DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup>
 future =
+            
service.describeShareGroupOffsets(requestContext(ApiKeys.DESCRIBE_SHARE_GROUP_OFFSETS),
 requestData);
+
+        assertEquals(responseData, future.get());
+    }
+
+    @Test
+    public void 
testDescribeShareGroupOffsetsWithDefaultPersisterLatestOffsetError() throws 
InterruptedException, ExecutionException {
+        CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = 
mockRuntime();
+        Persister persister = mock(DefaultStatePersister.class);
+
+        PartitionMetadataClient partitionMetadataClient = 
mock(PartitionMetadataClient.class);
+
+        GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
+            .setConfig(createConfig())
+            .setRuntime(runtime)
+            .setPersister(persister)
+            .setPartitionMetadataClient(partitionMetadataClient)
+            .build(true);
+        service.startup(() -> 1);
+
+        Exception ex = new Exception("failure");
+
+        Set<TopicPartition> partitionsToComputeLag = new HashSet<>(Set.of(new 
TopicPartition(TOPIC_NAME, 1)));
+        when(partitionMetadataClient.listLatestOffsets(partitionsToComputeLag))
+            .thenReturn(Map.of(new TopicPartition(TOPIC_NAME, 1), 
CompletableFuture.failedFuture(ex)));
+
+        int partition = 1;
+        
DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestGroup 
requestData = new 
DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestGroup()
+            .setGroupId("share-group-id")
+            .setTopics(List.of(new 
DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestTopic()
+                .setTopicName(TOPIC_NAME)
+                .setPartitions(List.of(partition))
+            ));
+
+        ReadShareGroupStateSummaryRequestData 
readShareGroupStateSummaryRequestData = new 
ReadShareGroupStateSummaryRequestData()
+            .setGroupId("share-group-id")
+            .setTopics(List.of(new 
ReadShareGroupStateSummaryRequestData.ReadStateSummaryData()
+                .setTopicId(TOPIC_ID)
+                .setPartitions(List.of(new 
ReadShareGroupStateSummaryRequestData.PartitionData()
+                    .setPartition(partition)))));
+
+        
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup 
responseData = new 
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup()
+            .setGroupId("share-group-id")
+            .setTopics(
+                List.of(new 
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic()
+                    .setTopicName(TOPIC_NAME)
+                    .setTopicId(TOPIC_ID)
+                    .setPartitions(List.of(new 
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition()
+                        .setPartitionIndex(partition)
+                        .setErrorCode(Errors.forException(ex).code())
+                        .setErrorMessage(ex.getMessage())
+                    ))
+                )
+            );
+
+        ReadShareGroupStateSummaryResponseData 
readShareGroupStateSummaryResponseData = new 
ReadShareGroupStateSummaryResponseData()
+            .setResults(
+                List.of(new 
ReadShareGroupStateSummaryResponseData.ReadStateSummaryResult()
+                    .setTopicId(TOPIC_ID)
+                    .setPartitions(List.of(new 
ReadShareGroupStateSummaryResponseData.PartitionResult()
+                        .setPartition(partition)
+                        .setStartOffset(21)
+                        .setDeliveryCompleteCount(10)
+                        .setStateEpoch(1)))
+                )
+            );
+
+        ReadShareGroupStateSummaryParameters 
readShareGroupStateSummaryParameters = 
ReadShareGroupStateSummaryParameters.from(readShareGroupStateSummaryRequestData);
+        ReadShareGroupStateSummaryResult readShareGroupStateSummaryResult = 
ReadShareGroupStateSummaryResult.from(readShareGroupStateSummaryResponseData);
+        when(persister.readSummary(
+            ArgumentMatchers.eq(readShareGroupStateSummaryParameters)
+        
)).thenReturn(CompletableFuture.completedFuture(readShareGroupStateSummaryResult));
+
+        
CompletableFuture<DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup>
 future =
+            
service.describeShareGroupOffsets(requestContext(ApiKeys.DESCRIBE_SHARE_GROUP_OFFSETS),
 requestData);
+
+        assertEquals(responseData, future.get());
+    }
+
     @Test
     public void testDescribeShareGroupOffsetsCoordinatorNotActive() throws 
ExecutionException, InterruptedException {
         CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = 
mockRuntime();
@@ -3964,12 +4114,20 @@ public class GroupCoordinatorServiceTest {
     public void testDescribeShareGroupAllOffsets() throws 
InterruptedException, ExecutionException {
         CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = 
mockRuntime();
         Persister persister = mock(DefaultStatePersister.class);
+
+        PartitionMetadataClient partitionMetadataClient = 
mock(PartitionMetadataClient.class);
+
         GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
             .setConfig(createConfig())
             .setRuntime(runtime)
             .setPersister(persister)
+            .setPartitionMetadataClient(partitionMetadataClient)
             .build(true);
 
+        Set<TopicPartition> partitionsToComputeLag = new HashSet<>(Set.of(new 
TopicPartition(TOPIC_NAME, 1)));
+        when(partitionMetadataClient.listLatestOffsets(partitionsToComputeLag))
+            .thenReturn(Map.of(new TopicPartition(TOPIC_NAME, 1), 
CompletableFuture.completedFuture(41L)));
+
         MetadataImage image = new MetadataImageBuilder()
             .addTopic(TOPIC_ID, TOPIC_NAME, 3)
             .build();
@@ -4003,7 +4161,8 @@ public class GroupCoordinatorServiceTest {
                     .setTopicId(TOPIC_ID)
                     .setPartitions(List.of(new 
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition()
                         .setPartitionIndex(partition)
-                        .setStartOffset(21))))
+                        .setStartOffset(21)
+                        .setLag(10L))))
             );
 
         ReadShareGroupStateSummaryResponseData 
readShareGroupStateSummaryResponseData = new 
ReadShareGroupStateSummaryResponseData()
@@ -4013,6 +4172,7 @@ public class GroupCoordinatorServiceTest {
                     .setPartitions(List.of(new 
ReadShareGroupStateSummaryResponseData.PartitionResult()
                         .setPartition(partition)
                         .setStartOffset(21)
+                        .setDeliveryCompleteCount(10)
                         .setStateEpoch(1)))
                 )
             );
@@ -4101,6 +4261,162 @@ public class GroupCoordinatorServiceTest {
         assertFutureThrows(IllegalStateException.class, future, "Result is 
null for the read state summary");
     }
 
+    @Test
+    public void testDescribeShareGroupAllOffsetsReadSummaryPartitionError() 
throws InterruptedException, ExecutionException {
+        CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = 
mockRuntime();
+        Persister persister = mock(DefaultStatePersister.class);
+
+        GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
+            .setConfig(createConfig())
+            .setRuntime(runtime)
+            .setPersister(persister)
+            .build(true);
+
+        MetadataImage image = new MetadataImageBuilder()
+            .addTopic(TOPIC_ID, TOPIC_NAME, 3)
+            .build();
+
+        service.onNewMetadataImage(new KRaftCoordinatorMetadataImage(image), 
null);
+
+        int partition = 1;
+
+        when(runtime.scheduleReadOperation(
+            ArgumentMatchers.eq("share-group-initialized-partitions"),
+            ArgumentMatchers.any(),
+            ArgumentMatchers.any()
+        )).thenReturn(CompletableFuture.completedFuture(Map.of(TOPIC_ID, 
Set.of(partition))));
+
+        
DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestGroup 
requestData = new 
DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestGroup()
+            .setGroupId("share-group-id")
+            .setTopics(null);
+
+        ReadShareGroupStateSummaryRequestData 
readShareGroupStateSummaryRequestData = new 
ReadShareGroupStateSummaryRequestData()
+            .setGroupId("share-group-id")
+            .setTopics(List.of(new 
ReadShareGroupStateSummaryRequestData.ReadStateSummaryData()
+                .setTopicId(TOPIC_ID)
+                .setPartitions(List.of(new 
ReadShareGroupStateSummaryRequestData.PartitionData()
+                    .setPartition(partition)))));
+
+        
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup 
responseData = new 
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup()
+            .setGroupId("share-group-id")
+            .setTopics(
+                List.of(new 
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic()
+                    .setTopicName(TOPIC_NAME)
+                    .setTopicId(TOPIC_ID)
+                    .setPartitions(List.of(new 
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition()
+                        .setPartitionIndex(partition)
+                        
.setStartOffset(PartitionFactory.UNINITIALIZED_START_OFFSET)
+                        .setLeaderEpoch(PartitionFactory.DEFAULT_LEADER_EPOCH)
+                        .setLag(PartitionFactory.UNINITIALIZED_LAG))))
+            );
+
+        ReadShareGroupStateSummaryResponseData 
readShareGroupStateSummaryResponseData = new 
ReadShareGroupStateSummaryResponseData()
+            .setResults(
+                List.of(new 
ReadShareGroupStateSummaryResponseData.ReadStateSummaryResult()
+                    .setTopicId(TOPIC_ID)
+                    .setPartitions(List.of(new 
ReadShareGroupStateSummaryResponseData.PartitionResult()
+                        .setPartition(partition)
+                        .setErrorCode(Errors.UNKNOWN_SERVER_ERROR.code())
+                        .setErrorMessage(Errors.UNKNOWN_SERVER_ERROR.message())
+                    ))
+                )
+            );
+
+        ReadShareGroupStateSummaryParameters 
readShareGroupStateSummaryParameters = 
ReadShareGroupStateSummaryParameters.from(readShareGroupStateSummaryRequestData);
+        ReadShareGroupStateSummaryResult readShareGroupStateSummaryResult = 
ReadShareGroupStateSummaryResult.from(readShareGroupStateSummaryResponseData);
+        when(persister.readSummary(
+            ArgumentMatchers.eq(readShareGroupStateSummaryParameters)
+        
)).thenReturn(CompletableFuture.completedFuture(readShareGroupStateSummaryResult));
+
+        
CompletableFuture<DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup>
 future =
+            
service.describeShareGroupAllOffsets(requestContext(ApiKeys.DESCRIBE_SHARE_GROUP_OFFSETS),
 requestData);
+
+        assertEquals(responseData, future.get());
+    }
+
+    @Test
+    public void testDescribeShareGroupAllOffsetsLatestOffsetError() throws 
InterruptedException, ExecutionException {
+        CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = 
mockRuntime();
+        Persister persister = mock(DefaultStatePersister.class);
+
+        PartitionMetadataClient partitionMetadataClient = 
mock(PartitionMetadataClient.class);
+
+        GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
+            .setConfig(createConfig())
+            .setRuntime(runtime)
+            .setPersister(persister)
+            .setPartitionMetadataClient(partitionMetadataClient)
+            .build(true);
+
+        Exception ex = new Exception("failure");
+
+        Set<TopicPartition> partitionsToComputeLag = new HashSet<>(Set.of(new 
TopicPartition(TOPIC_NAME, 1)));
+        when(partitionMetadataClient.listLatestOffsets(partitionsToComputeLag))
+            .thenReturn(Map.of(new TopicPartition(TOPIC_NAME, 1), 
CompletableFuture.failedFuture(ex)));
+
+        MetadataImage image = new MetadataImageBuilder()
+            .addTopic(TOPIC_ID, TOPIC_NAME, 3)
+            .build();
+
+        service.onNewMetadataImage(new KRaftCoordinatorMetadataImage(image), 
null);
+
+        int partition = 1;
+
+        when(runtime.scheduleReadOperation(
+            ArgumentMatchers.eq("share-group-initialized-partitions"),
+            ArgumentMatchers.any(),
+            ArgumentMatchers.any()
+        )).thenReturn(CompletableFuture.completedFuture(Map.of(TOPIC_ID, 
Set.of(partition))));
+
+        
DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestGroup 
requestData = new 
DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestGroup()
+            .setGroupId("share-group-id")
+            .setTopics(null);
+
+        ReadShareGroupStateSummaryRequestData 
readShareGroupStateSummaryRequestData = new 
ReadShareGroupStateSummaryRequestData()
+            .setGroupId("share-group-id")
+            .setTopics(List.of(new 
ReadShareGroupStateSummaryRequestData.ReadStateSummaryData()
+                .setTopicId(TOPIC_ID)
+                .setPartitions(List.of(new 
ReadShareGroupStateSummaryRequestData.PartitionData()
+                    .setPartition(partition)))));
+
+        
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup 
responseData = new 
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup()
+            .setGroupId("share-group-id")
+            .setTopics(
+                List.of(new 
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic()
+                    .setTopicName(TOPIC_NAME)
+                    .setTopicId(TOPIC_ID)
+                    .setPartitions(List.of(new 
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition()
+                        .setPartitionIndex(partition)
+                        .setErrorCode(Errors.forException(ex).code())
+                        .setErrorMessage(ex.getMessage())
+                    ))
+                )
+            );
+
+        ReadShareGroupStateSummaryResponseData 
readShareGroupStateSummaryResponseData = new 
ReadShareGroupStateSummaryResponseData()
+            .setResults(
+                List.of(new 
ReadShareGroupStateSummaryResponseData.ReadStateSummaryResult()
+                    .setTopicId(TOPIC_ID)
+                    .setPartitions(List.of(new 
ReadShareGroupStateSummaryResponseData.PartitionResult()
+                        .setPartition(partition)
+                        .setStartOffset(21)
+                        .setDeliveryCompleteCount(10)
+                        .setStateEpoch(1)))
+                )
+            );
+
+        ReadShareGroupStateSummaryParameters 
readShareGroupStateSummaryParameters = 
ReadShareGroupStateSummaryParameters.from(readShareGroupStateSummaryRequestData);
+        ReadShareGroupStateSummaryResult readShareGroupStateSummaryResult = 
ReadShareGroupStateSummaryResult.from(readShareGroupStateSummaryResponseData);
+        when(persister.readSummary(
+            ArgumentMatchers.eq(readShareGroupStateSummaryParameters)
+        
)).thenReturn(CompletableFuture.completedFuture(readShareGroupStateSummaryResult));
+
+        
CompletableFuture<DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup>
 future =
+            
service.describeShareGroupAllOffsets(requestContext(ApiKeys.DESCRIBE_SHARE_GROUP_OFFSETS),
 requestData);
+
+        assertEquals(responseData, future.get());
+    }
+
     @Test
     public void testDescribeShareGroupAllOffsetsCoordinatorNotActive() throws 
ExecutionException, InterruptedException {
         CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = 
mockRuntime();
@@ -5650,6 +5966,7 @@ public class GroupCoordinatorServiceTest {
         private GroupCoordinatorMetrics metrics = new 
GroupCoordinatorMetrics();
         private Persister persister = new NoOpStatePersister();
         private CoordinatorMetadataImage metadataImage = null;
+        private PartitionMetadataClient partitionMetadataClient = null;
 
         GroupCoordinatorService build() {
             return build(false);
@@ -5667,7 +5984,8 @@ public class GroupCoordinatorServiceTest {
                 metrics,
                 configManager,
                 persister,
-                new MockTimer()
+                new MockTimer(),
+                partitionMetadataClient
             );
 
             if (serviceStartup) {
@@ -5703,6 +6021,11 @@ public class GroupCoordinatorServiceTest {
             this.metrics = metrics;
             return this;
         }
+
+        public GroupCoordinatorServiceBuilder 
setPartitionMetadataClient(PartitionMetadataClient partitionMetadataClient) {
+            this.partitionMetadataClient = partitionMetadataClient;
+            return this;
+        }
     }
 
     private static DeleteShareGroupStateParameters 
createDeleteShareRequest(String groupId, Uuid topic, List<Integer> partitions) {
diff --git 
a/server-common/src/main/java/org/apache/kafka/server/share/persister/PartitionFactory.java
 
b/server-common/src/main/java/org/apache/kafka/server/share/persister/PartitionFactory.java
index ed998bb0b1f..215e95ff085 100644
--- 
a/server-common/src/main/java/org/apache/kafka/server/share/persister/PartitionFactory.java
+++ 
b/server-common/src/main/java/org/apache/kafka/server/share/persister/PartitionFactory.java
@@ -28,6 +28,7 @@ public class PartitionFactory {
     public static final int DEFAULT_STATE_EPOCH = 0;
     public static final int UNINITIALIZED_START_OFFSET = -1;
     public static final int UNINITIALIZED_DELIVERY_COMPLETE_COUNT = -1;
+    public static final long UNINITIALIZED_LAG = -1;
     public static final short DEFAULT_ERROR_CODE = Errors.NONE.code();
     public static final int DEFAULT_LEADER_EPOCH = 0;
     public static final String DEFAULT_ERR_MESSAGE = Errors.NONE.message();

Reply via email to