This is an automated email from the ASF dual-hosted git repository.

mittal pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 8461ae331a0 KAFKA-19800-2: Created an implementation class 
NetworkPartitionMetadataClient for PartitionMetadataClient (#20852)
8461ae331a0 is described below

commit 8461ae331a012e0f2b6e5983991b46276d314452
Author: Chirag Wadhwa <[email protected]>
AuthorDate: Mon Nov 17 18:38:15 2025 +0530

    KAFKA-19800-2: Created an implementation class 
NetworkPartitionMetadataClient for PartitionMetadataClient (#20852)
    
    This PR is part of
    
    
[KIP-1226](https://cwiki.apache.org/confluence/display/KAFKA/KIP-1226%3A+Introducing+Share+Partition+Lag+Persistence+and+Retrieval).
    
    This PR introduces an implementation class
    NetworkPartitionMetadataClient for PartitionMetadataClient, that uses a
    NetworkClient to send ListOffsetsRequest to the destination node. The
    destination node should be the leader broker for the partitions in the
    request and is retrieved from MetadataCache.
    
    This new imple class will later be used in GroupCoordinatorService to
    find the partition end offsets while computing share partition lag for
    DescribeShareGroupOffsets request.
    
    Reviewers: Apoorv Mittal <[email protected]>, Andrew Schofield
     <[email protected]>
---
 checkstyle/import-control-group-coordinator.xml    |   2 +-
 .../src/main/scala/kafka/server/BrokerServer.scala |  45 +-
 .../coordinator/group/GroupCoordinatorService.java |  36 +-
 .../group/NetworkPartitionMetadataClient.java      | 298 +++++++
 .../coordinator/group/PartitionMetadataClient.java |  11 +-
 .../group/GroupCoordinatorServiceTest.java         | 231 +++--
 .../group/NetworkPartitionMetadataClientTest.java  | 972 +++++++++++++++++++++
 7 files changed, 1467 insertions(+), 128 deletions(-)

diff --git a/checkstyle/import-control-group-coordinator.xml 
b/checkstyle/import-control-group-coordinator.xml
index 44b5b020251..0e9aabad4a2 100644
--- a/checkstyle/import-control-group-coordinator.xml
+++ b/checkstyle/import-control-group-coordinator.xml
@@ -52,7 +52,7 @@
     <subpackage name="coordinator">
         <subpackage name="group">
             <allow pkg="net.jpountz.xxhash" />
-            <allow pkg="org.apache.kafka.clients.consumer" />
+            <allow pkg="org.apache.kafka.clients" />
             <allow pkg="org.apache.kafka.common.annotation" />
             <allow pkg="org.apache.kafka.common.config" />
             <allow pkg="org.apache.kafka.common.compress" />
diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala 
b/core/src/main/scala/kafka/server/BrokerServer.scala
index 5772a2879bc..3cba40491cd 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -36,12 +36,12 @@ import org.apache.kafka.common.utils.{LogContext, Time, 
Utils}
 import org.apache.kafka.common.{ClusterResource, TopicPartition, Uuid}
 import org.apache.kafka.coordinator.common.runtime.{CoordinatorLoaderImpl, 
CoordinatorRecord}
 import org.apache.kafka.coordinator.group.metrics.{GroupCoordinatorMetrics, 
GroupCoordinatorRuntimeMetrics}
-import org.apache.kafka.coordinator.group.{GroupConfigManager, 
GroupCoordinator, GroupCoordinatorRecordSerde, GroupCoordinatorService, 
PartitionMetadataClient}
+import org.apache.kafka.coordinator.group.{GroupConfigManager, 
GroupCoordinator, GroupCoordinatorRecordSerde, GroupCoordinatorService, 
NetworkPartitionMetadataClient, PartitionMetadataClient}
 import org.apache.kafka.coordinator.share.metrics.{ShareCoordinatorMetrics, 
ShareCoordinatorRuntimeMetrics}
 import org.apache.kafka.coordinator.share.{ShareCoordinator, 
ShareCoordinatorRecordSerde, ShareCoordinatorService}
 import org.apache.kafka.coordinator.transaction.ProducerIdManager
 import org.apache.kafka.image.publisher.{BrokerRegistrationTracker, 
MetadataPublisher}
-import org.apache.kafka.metadata.{BrokerState, ListenerInfo, 
MetadataVersionConfigValidator}
+import org.apache.kafka.metadata.{BrokerState, ListenerInfo, MetadataCache, 
MetadataVersionConfigValidator}
 import org.apache.kafka.metadata.publisher.{AclPublisher, 
DelegationTokenPublisher, DynamicClientQuotaPublisher, ScramPublisher}
 import org.apache.kafka.security.{CredentialProvider, DelegationTokenManager}
 import org.apache.kafka.server.authorizer.Authorizer
@@ -96,6 +96,8 @@ class BrokerServer(
 
   private var assignmentsManager: AssignmentsManager = _
 
+  private var partitionMetadataClient: PartitionMetadataClient = _
+
   val lock: ReentrantLock = new ReentrantLock()
   val awaitShutdownCond: Condition = lock.newCondition()
   var status: ProcessStatus = SHUTDOWN
@@ -119,8 +121,6 @@ class BrokerServer(
   var credentialProvider: CredentialProvider = _
   var tokenCache: DelegationTokenCache = _
 
-  var partitionMetadataClient: PartitionMetadataClient = _
-
   @volatile var groupCoordinator: GroupCoordinator = _
 
   var groupConfigManager: GroupConfigManager = _
@@ -373,7 +373,7 @@ class BrokerServer(
       /* create persister */
       persister = createShareStatePersister()
 
-      partitionMetadataClient = createPartitionMetadataClient()
+      partitionMetadataClient = createPartitionMetadataClient(metadataCache)
 
       groupCoordinator = createGroupCoordinator()
 
@@ -624,23 +624,19 @@ class BrokerServer(
     }
   }
 
-  private def createPartitionMetadataClient(): PartitionMetadataClient = {
-    // This is a no-op implementation of PartitionMetadataClient. It always 
returns -1 as the latest offset for any
-    // requested topic partition.
-    // TODO: KAFKA-19800: Implement a real PartitionMetadataClient that can 
fetch latest offsets via InterBrokerSendThread.
-    new PartitionMetadataClient {
-      override def listLatestOffsets(topicPartitions: util.Set[TopicPartition]
-                                    ): util.Map[TopicPartition, 
util.concurrent.CompletableFuture[java.lang.Long]] = {
-        topicPartitions.asScala
-          .map { tp =>
-            tp -> 
CompletableFuture.completedFuture(java.lang.Long.valueOf(-1L))
-          }
-          .toMap
-          .asJava
-      }
-
-      override def close(): Unit = {}
-    }
+  private def createPartitionMetadataClient(metadataCache: MetadataCache): 
PartitionMetadataClient = {
+    new NetworkPartitionMetadataClient(
+      metadataCache,
+      () => NetworkUtils.buildNetworkClient(
+        "NetworkPartitionMetadataClient",
+        config,
+        metrics,
+        Time.SYSTEM,
+        new LogContext(s"[NetworkPartitionMetadataClient 
broker=${config.brokerId}]")
+      ),
+      Time.SYSTEM,
+      config.interBrokerListenerName()
+    )
   }
 
   private def createGroupCoordinator(): GroupCoordinator = {
@@ -823,8 +819,13 @@ class BrokerServer(
 
       if (groupConfigManager != null)
         CoreUtils.swallow(groupConfigManager.close(), this)
+
       if (groupCoordinator != null)
         CoreUtils.swallow(groupCoordinator.shutdown(), this)
+
+      if (partitionMetadataClient != null)
+        CoreUtils.swallow(partitionMetadataClient.close(), this)
+
       if (shareCoordinator != null)
         CoreUtils.swallow(shareCoordinator.shutdown(), this)
 
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
index 3ffb2d3e644..afb9f744d04 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
@@ -1884,8 +1884,8 @@ public class GroupCoordinatorService implements 
GroupCoordinator {
         });
 
         // Fetch latest offsets for all partitions that need lag computation.
-        Map<TopicPartition, CompletableFuture<Long>> partitionLatestOffsets = 
partitionsToComputeLag.isEmpty() ? Map.of() :
-                
partitionMetadataClient.listLatestOffsets(partitionsToComputeLag);
+        Map<TopicPartition, 
CompletableFuture<PartitionMetadataClient.OffsetResponse>> 
partitionLatestOffsets =
+            partitionsToComputeLag.isEmpty() ? Map.of() : 
partitionMetadataClient.listLatestOffsets(partitionsToComputeLag);
 
         // Final response object to be built. It will include lag information 
computed from partitionMetadataClient.
         
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup 
responseGroup =
@@ -1898,8 +1898,13 @@ public class GroupCoordinatorService implements 
GroupCoordinator {
         CompletableFuture.allOf(partitionLatestOffsets.values().toArray(new 
CompletableFuture<?>[0]))
             .whenComplete((result, error) -> {
                 // The error variable will not be null when one or more of the 
partitionLatestOffsets futures get completed exceptionally.
-                // If that is the case, then the same exception would be 
caught in the try catch executed below when .join() is called.
-                // Thus, we do not need to check error != null here.
+                // As per the handling of these futures in 
NetworkPartitionMetadataClient, this should not happen, as error cases are
+                // handled within the OffsetResponse object for each 
partition. This is just a safety check.
+                if (error != null) {
+                    log.error("Failed to retrieve partition end offsets while 
calculating share partitions lag for share group - {}", groupId, error);
+                    responseFuture.completeExceptionally(error);
+                    return;
+                }
                 readSummaryResult.topicsData().forEach(topicData -> {
                     // Build response for each topic.
                     
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic 
topic =
@@ -1924,26 +1929,27 @@ public class GroupCoordinatorService implements 
GroupCoordinator {
                                 .setLeaderEpoch(partitionData.errorCode() == 
Errors.NONE.code() ? partitionData.leaderEpoch() : 
PartitionFactory.DEFAULT_LEADER_EPOCH)
                                 .setLag(PartitionFactory.UNINITIALIZED_LAG));
                         } else {
-                            try {
-                                // This code is reached when allOf above is 
complete, which happens when all the
-                                // individual futures are complete. Thus, the 
call to join() here is safe.
-                                long partitionLatestOffset = 
partitionLatestOffsets.get(tp).join();
+                            // This code is reached when allOf above is 
complete, which happens when all the
+                            // individual futures are complete. Thus, the call 
to join() here is safe.
+                            PartitionMetadataClient.OffsetResponse 
offsetResponse = partitionLatestOffsets.get(tp).join();
+                            if (offsetResponse.error().code() != 
Errors.NONE.code()) {
+                                // If there was an error during fetching 
latest offset for a partition, return the error in the response for that 
partition.
+                                log.error("Partition end offset fetch failed 
for topicPartition {}", tp);
+                                partitionResponses.add(new 
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition()
+                                    
.setPartitionIndex(partitionData.partition())
+                                    
.setErrorCode(offsetResponse.error().code())
+                                    
.setErrorMessage(offsetResponse.error().message()));
+                            } else {
                                 // Compute lag as (partition end offset - 
startOffset - deliveryCompleteCount).
                                 // Note, partition end offset, which is 
retrieved from partitionMetadataClient, is the offset of
                                 // the next message to be produced, not the 
last message offset. Thus, the formula for lag computation
                                 // does not need a +1 adjustment.
-                                long lag = partitionLatestOffset - 
partitionData.startOffset() - partitionData.deliveryCompleteCount();
+                                long lag = offsetResponse.offset() - 
partitionData.startOffset() - partitionData.deliveryCompleteCount();
                                 partitionResponses.add(new 
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition()
                                     
.setPartitionIndex(partitionData.partition())
                                     
.setStartOffset(partitionData.startOffset())
                                     
.setLeaderEpoch(partitionData.leaderEpoch())
                                     .setLag(lag));
-                            } catch (CompletionException e) {
-                                // If fetching latest offset for a partition 
failed, return the error in the response for that partition.
-                                partitionResponses.add(new 
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition()
-                                    
.setPartitionIndex(partitionData.partition())
-                                    
.setErrorCode(Errors.forException(e.getCause()).code())
-                                    
.setErrorMessage(e.getCause().getMessage()));
                             }
                         }
                     });
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/NetworkPartitionMetadataClient.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/NetworkPartitionMetadataClient.java
new file mode 100644
index 00000000000..355b0d2fb21
--- /dev/null
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/NetworkPartitionMetadataClient.java
@@ -0,0 +1,298 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.KafkaClient;
+import org.apache.kafka.common.IsolationLevel;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicPartition;
+import 
org.apache.kafka.common.message.ListOffsetsRequestData.ListOffsetsPartition;
+import org.apache.kafka.common.message.ListOffsetsRequestData.ListOffsetsTopic;
+import 
org.apache.kafka.common.message.ListOffsetsResponseData.ListOffsetsPartitionResponse;
+import 
org.apache.kafka.common.message.ListOffsetsResponseData.ListOffsetsTopicResponse;
+import org.apache.kafka.common.network.ListenerName;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ListOffsetsRequest;
+import org.apache.kafka.common.requests.ListOffsetsResponse;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.metadata.MetadataCache;
+import org.apache.kafka.server.util.InterBrokerSendThread;
+import org.apache.kafka.server.util.RequestAndCompletionHandler;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Supplier;
+
+public class NetworkPartitionMetadataClient implements PartitionMetadataClient 
{
+
+    private static final Logger log = 
LoggerFactory.getLogger(NetworkPartitionMetadataClient.class);
+
+    private final MetadataCache metadataCache;
+    private final Supplier<KafkaClient> networkClientSupplier;
+    private final Time time;
+    private final ListenerName listenerName;
+    private final AtomicBoolean initialized = new AtomicBoolean(false);
+    private volatile SendThread sendThread;
+
+    public NetworkPartitionMetadataClient(MetadataCache metadataCache,
+                                          Supplier<KafkaClient> 
networkClientSupplier,
+                                          Time time, ListenerName 
listenerName) {
+        this.metadataCache = metadataCache;
+        this.networkClientSupplier = networkClientSupplier;
+        this.time = time;
+        this.listenerName = listenerName;
+    }
+
+    @Override
+    public Map<TopicPartition, CompletableFuture<OffsetResponse>> 
listLatestOffsets(Set<TopicPartition> topicPartitions) {
+        if (topicPartitions == null || topicPartitions.isEmpty()) {
+            return Map.of();
+        }
+
+        // Initialize sendThread lazily on first call
+        ensureSendThreadInitialized();
+
+        // Map to store futures for each TopicPartition
+        Map<TopicPartition, CompletableFuture<OffsetResponse>> futures = new 
HashMap<>();
+        // Group TopicPartitions by leader node
+        Map<Node, List<TopicPartition>> partitionsByNode = new HashMap<>();
+        for (TopicPartition tp : topicPartitions) {
+            // Get leader node for this partition
+            Optional<Node> leaderNodeOpt = 
metadataCache.getPartitionLeaderEndpoint(
+                tp.topic(),
+                tp.partition(),
+                listenerName
+            );
+
+            if (leaderNodeOpt.isEmpty() || leaderNodeOpt.get().isEmpty()) {
+                // No leader available - complete with error
+                futures.put(tp, CompletableFuture.completedFuture(new 
OffsetResponse(-1, Errors.LEADER_NOT_AVAILABLE)));
+                continue;
+            }
+
+            partitionsByNode.computeIfAbsent(leaderNodeOpt.get(), k -> new 
ArrayList<>()).add(tp);
+        }
+
+        // Create and enqueue requests for each node
+        partitionsByNode.forEach((node, partitionsByLeader) -> {
+            // All partitions with the same leader node will be included in 
the same ListOffsetsRequest.
+            Map<TopicPartition, CompletableFuture<OffsetResponse>> 
partitionFuturesByLeader = new HashMap<>();
+            for (TopicPartition tp : partitionsByLeader) {
+                CompletableFuture<OffsetResponse> future = new 
CompletableFuture<>();
+                futures.put(tp, future);
+                partitionFuturesByLeader.put(tp, future);
+            }
+
+            // Create ListOffsetsRequest for this node
+            ListOffsetsRequest.Builder requestBuilder = 
createListOffsetsRequest(partitionsByLeader);
+            // Create pending request to track this request
+            PendingRequest pendingRequest = new PendingRequest(node, 
partitionFuturesByLeader, requestBuilder);
+            // Enqueue to send thread
+            sendThread.enqueue(pendingRequest);
+        });
+
+        return futures;
+    }
+
+    @Override
+    public void close() {
+        // Only close sendThread if it was initialized. Note, close is called 
only during broker shutdown, so need
+        // for further synchronization here.
+        if (!initialized.get()) {
+            return;
+        }
+        if (sendThread != null) {
+            try {
+                sendThread.shutdown();
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+                log.error("Interrupted while shutting down 
NetworkPartitionMetadataClient", e);
+            }
+        }
+    }
+
+    /**
+     * Ensures that the sendThread is initialized. This method is thread-safe 
and will only
+     * initialize the sendThread once, even if called concurrently.
+     */
+    // Visible for testing.
+    void ensureSendThreadInitialized() {
+        if (initialized.compareAndSet(false, true)) {
+            KafkaClient networkClient = networkClientSupplier.get();
+            sendThread = new SendThread(
+                "NetworkPartitionMetadataClientSendThread",
+                networkClient,
+                
Math.toIntExact(CommonClientConfigs.DEFAULT_SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS),
  //30 seconds
+                this.time
+            );
+            sendThread.start();
+            log.info("NetworkPartitionMetadataClient sendThread initialized 
and started");
+        }
+    }
+
+    /**
+     * Creates a ListOffsetsRequest Builder for the given partitions 
requesting latest offsets.
+     */
+    private ListOffsetsRequest.Builder 
createListOffsetsRequest(List<TopicPartition> partitions) {
+        Map<String, ListOffsetsTopic> topicsMap = new HashMap<>();
+        partitions.forEach(tp -> {
+            if (!topicsMap.containsKey(tp.topic())) {
+                ListOffsetsTopic topic = new 
ListOffsetsTopic().setName(tp.topic());
+                topicsMap.put(tp.topic(), topic);
+            }
+            ListOffsetsTopic topic = topicsMap.get(tp.topic());
+            topic.partitions().add(
+                new ListOffsetsPartition()
+                    .setPartitionIndex(tp.partition())
+                    .setTimestamp(ListOffsetsRequest.LATEST_TIMESTAMP)
+                    .setCurrentLeaderEpoch(-1) // Will be set by broker if 
needed
+            );
+        });
+        // Isolation level will always be READ_UNCOMMITTED when finding the 
partition end offset.
+        return ListOffsetsRequest.Builder.forConsumer(
+            true,
+            IsolationLevel.READ_UNCOMMITTED
+        ).setTargetTimes(List.copyOf(topicsMap.values()));
+    }
+
+    /**
+     * Handles the response from a ListOffsets request.
+     */
+    // Visible for Testing.
+    void handleResponse(Map<TopicPartition, CompletableFuture<OffsetResponse>> 
partitionFutures, ClientResponse clientResponse) {
+        // Handle error responses first
+        if (maybeHandleErrorResponse(partitionFutures, clientResponse)) {
+            return;
+        }
+
+        log.debug("ListOffsets response received successfully - {}", 
clientResponse);
+        ListOffsetsResponse response = (ListOffsetsResponse) 
clientResponse.responseBody();
+
+        for (ListOffsetsTopicResponse topicResponse : response.topics()) {
+            String topicName = topicResponse.name();
+            for (ListOffsetsPartitionResponse partitionResponse : 
topicResponse.partitions()) {
+                TopicPartition tp = new TopicPartition(topicName, 
partitionResponse.partitionIndex());
+                // Get the corresponding future from the map and complete it.
+                CompletableFuture<OffsetResponse> future = 
partitionFutures.get(tp);
+                if (future != null) {
+                    future.complete(new 
OffsetResponse(partitionResponse.offset(), 
Errors.forCode(partitionResponse.errorCode())));
+                }
+            }
+        }
+
+        partitionFutures.forEach((tp, future) -> {
+            // If future is not completed yet hence topic-partition was not 
included in the response, complete with error
+            if (!future.isDone()) {
+                future.complete(new OffsetResponse(-1, 
Errors.UNKNOWN_TOPIC_OR_PARTITION));
+            }
+        });
+    }
+
+    /**
+     * Handles error responses by completing all associated futures with an 
error. Returns true if an error was
+     * handled. Otherwise, returns false.
+     */
+    private boolean maybeHandleErrorResponse(Map<TopicPartition, 
CompletableFuture<OffsetResponse>> partitionFutures, ClientResponse 
clientResponse) {
+        Errors error;
+        if (clientResponse == null) {
+            log.error("Response for ListOffsets for topicPartitions: {} is 
null", partitionFutures.keySet());
+            error = Errors.UNKNOWN_SERVER_ERROR;
+        } else if (clientResponse.authenticationException() != null) {
+            log.error("Authentication exception", 
clientResponse.authenticationException());
+            error = Errors.UNKNOWN_SERVER_ERROR;
+        } else if (clientResponse.versionMismatch() != null) {
+            log.error("Version mismatch exception", 
clientResponse.versionMismatch());
+            error = Errors.UNKNOWN_SERVER_ERROR;
+        } else if (clientResponse.wasDisconnected()) {
+            log.error("Response for ListOffsets for TopicPartitions: {} was 
disconnected - {}.", partitionFutures.keySet(), clientResponse);
+            error = Errors.NETWORK_EXCEPTION;
+        } else if (clientResponse.wasTimedOut()) {
+            log.error("Response for ListOffsets for TopicPartitions: {} timed 
out - {}.", partitionFutures.keySet(), clientResponse);
+            error = Errors.REQUEST_TIMED_OUT;
+        } else if (!clientResponse.hasResponse()) {
+            log.error("Response for ListOffsets for TopicPartitions: {} has no 
response - {}.", partitionFutures.keySet(), clientResponse);
+            error = Errors.UNKNOWN_SERVER_ERROR;
+        } else {
+            // No error to handle, returning false instantly.
+            return false;
+        }
+
+        partitionFutures.forEach((tp, future) -> future.complete(new 
OffsetResponse(-1, error)));
+        return true;
+    }
+
+    /**
+     * Tracks a pending ListOffsets request and its associated futures.
+     */
+    private record PendingRequest(Node node,
+                                  Map<TopicPartition, 
CompletableFuture<OffsetResponse>> futures,
+                                  ListOffsetsRequest.Builder requestBuilder) {
+    }
+
+    private class SendThread extends InterBrokerSendThread {
+        private final ConcurrentLinkedQueue<PendingRequest> pendingRequests = 
new ConcurrentLinkedQueue<>();
+
+        protected SendThread(String name, KafkaClient networkClient, int 
requestTimeoutMs, Time time) {
+            super(name, networkClient, requestTimeoutMs, time);
+        }
+
+        /**
+         * Enqueues a pending request to be sent.
+         */
+        public void enqueue(PendingRequest pendingRequest) {
+            pendingRequests.add(pendingRequest);
+            wakeup();
+        }
+
+        @Override
+        public Collection<RequestAndCompletionHandler> generateRequests() {
+            List<RequestAndCompletionHandler> requests = new ArrayList<>();
+
+            // Process all pending requests
+            PendingRequest pending;
+            while ((pending = pendingRequests.poll()) != null) {
+                final PendingRequest current = pending;
+                ListOffsetsRequest.Builder requestBuilder = 
current.requestBuilder;
+
+                // Create completion handler
+                RequestAndCompletionHandler requestHandler = new 
RequestAndCompletionHandler(
+                    time.hiResClockMs(),
+                    current.node,
+                    requestBuilder,
+                    response -> handleResponse(current.futures, response)
+                );
+
+                requests.add(requestHandler);
+            }
+
+            return requests;
+        }
+    }
+}
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/PartitionMetadataClient.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/PartitionMetadataClient.java
index ea6d940143f..cf67e9ceb48 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/PartitionMetadataClient.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/PartitionMetadataClient.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.coordinator.group;
 
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.protocol.Errors;
 
 import java.util.Map;
 import java.util.Set;
@@ -32,7 +33,15 @@ public interface PartitionMetadataClient extends 
AutoCloseable {
      * @param topicPartitions A set of topic partitions.
      * @return A map of topic partitions to the completableFuture of their 
latest offsets
      */
-    Map<TopicPartition, CompletableFuture<Long>> listLatestOffsets(
+    Map<TopicPartition, CompletableFuture<OffsetResponse>> listLatestOffsets(
         Set<TopicPartition> topicPartitions
     );
+
+    /**
+     * A record to hold the offset and any associated error.
+     */
+    record OffsetResponse(
+        long offset,
+        Errors error
+    ) { }
 }
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
index 1a4efc6d989..10faf16b3a8 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
@@ -36,6 +36,7 @@ import 
org.apache.kafka.common.errors.StreamsInvalidTopologyEpochException;
 import org.apache.kafka.common.errors.StreamsInvalidTopologyException;
 import org.apache.kafka.common.errors.StreamsTopologyFencedException;
 import org.apache.kafka.common.errors.UnknownMemberIdException;
+import org.apache.kafka.common.errors.UnknownServerException;
 import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
 import org.apache.kafka.common.internals.Topic;
 import org.apache.kafka.common.message.AlterShareGroupOffsetsRequestData;
@@ -151,6 +152,7 @@ import static 
org.apache.kafka.test.TestUtils.assertFutureThrows;
 import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -3742,7 +3744,8 @@ public class GroupCoordinatorServiceTest {
 
         Set<TopicPartition> partitionsToComputeLag = new HashSet<>(Set.of(new 
TopicPartition(TOPIC_NAME, 1)));
         when(partitionMetadataClient.listLatestOffsets(partitionsToComputeLag))
-            .thenReturn(Map.of(new TopicPartition(TOPIC_NAME, 1), 
CompletableFuture.completedFuture(41L)));
+            .thenReturn(Map.of(new TopicPartition(TOPIC_NAME, 1),
+                CompletableFuture.completedFuture(new 
PartitionMetadataClient.OffsetResponse(41L, Errors.NONE))));
 
         int partition = 1;
         
DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestGroup 
requestData = new 
DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestGroup()
@@ -3980,7 +3983,7 @@ public class GroupCoordinatorServiceTest {
     }
 
     @Test
-    public void 
testDescribeShareGroupOffsetsWithDefaultPersisterLatestOffsetError() throws 
InterruptedException, ExecutionException {
+    public void 
testDescribeShareGroupOffsetsWithDefaultPersisterLatestOffsetThrowsError() 
throws InterruptedException, ExecutionException {
         CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = 
mockRuntime();
         Persister persister = mock(DefaultStatePersister.class);
 
@@ -3994,7 +3997,7 @@ public class GroupCoordinatorServiceTest {
             .build(true);
         service.startup(() -> 1);
 
-        Exception ex = new Exception("failure");
+        Exception ex = new UnknownServerException("failure");
 
         Set<TopicPartition> partitionsToComputeLag = new HashSet<>(Set.of(new 
TopicPartition(TOPIC_NAME, 1)));
         when(partitionMetadataClient.listLatestOffsets(partitionsToComputeLag))
@@ -4015,6 +4018,67 @@ public class GroupCoordinatorServiceTest {
                 .setPartitions(List.of(new 
ReadShareGroupStateSummaryRequestData.PartitionData()
                     .setPartition(partition)))));
 
+        ReadShareGroupStateSummaryResponseData 
readShareGroupStateSummaryResponseData = new 
ReadShareGroupStateSummaryResponseData()
+            .setResults(
+                List.of(new 
ReadShareGroupStateSummaryResponseData.ReadStateSummaryResult()
+                    .setTopicId(TOPIC_ID)
+                    .setPartitions(List.of(new 
ReadShareGroupStateSummaryResponseData.PartitionResult()
+                        .setPartition(partition)
+                        .setStartOffset(21)
+                        .setDeliveryCompleteCount(10)
+                        .setStateEpoch(1)))
+                )
+            );
+
+        ReadShareGroupStateSummaryParameters 
readShareGroupStateSummaryParameters = 
ReadShareGroupStateSummaryParameters.from(readShareGroupStateSummaryRequestData);
+        ReadShareGroupStateSummaryResult readShareGroupStateSummaryResult = 
ReadShareGroupStateSummaryResult.from(readShareGroupStateSummaryResponseData);
+        when(persister.readSummary(
+            ArgumentMatchers.eq(readShareGroupStateSummaryParameters)
+        
)).thenReturn(CompletableFuture.completedFuture(readShareGroupStateSummaryResult));
+
+        
CompletableFuture<DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup>
 future =
+            
service.describeShareGroupOffsets(requestContext(ApiKeys.DESCRIBE_SHARE_GROUP_OFFSETS),
 requestData);
+
+        CompletionException responseException = 
assertThrows(CompletionException.class, future::join);
+        assertInstanceOf(UnknownServerException.class, 
responseException.getCause());
+        assertEquals("failure", responseException.getCause().getMessage());
+    }
+
+    @Test
+    public void 
testDescribeShareGroupOffsetsWithDefaultPersisterLatestOffsetReturnsError() 
throws InterruptedException, ExecutionException {
+        CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = 
mockRuntime();
+        Persister persister = mock(DefaultStatePersister.class);
+
+        PartitionMetadataClient partitionMetadataClient = 
mock(PartitionMetadataClient.class);
+
+        GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
+            .setConfig(createConfig())
+            .setRuntime(runtime)
+            .setPersister(persister)
+            .setPartitionMetadataClient(partitionMetadataClient)
+            .build(true);
+        service.startup(() -> 1);
+
+        Set<TopicPartition> partitionsToComputeLag = new HashSet<>(Set.of(new 
TopicPartition(TOPIC_NAME, 1)));
+        when(partitionMetadataClient.listLatestOffsets(partitionsToComputeLag))
+            .thenReturn(Map.of(new TopicPartition(TOPIC_NAME, 1),
+                CompletableFuture.completedFuture(new 
PartitionMetadataClient.OffsetResponse(-1, Errors.NETWORK_EXCEPTION))));
+
+        int partition = 1;
+        
DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestGroup 
requestData = new 
DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestGroup()
+            .setGroupId("share-group-id")
+            .setTopics(List.of(new 
DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestTopic()
+                .setTopicName(TOPIC_NAME)
+                .setPartitions(List.of(partition))
+            ));
+
+        ReadShareGroupStateSummaryRequestData 
readShareGroupStateSummaryRequestData = new 
ReadShareGroupStateSummaryRequestData()
+            .setGroupId("share-group-id")
+            .setTopics(List.of(new 
ReadShareGroupStateSummaryRequestData.ReadStateSummaryData()
+                .setTopicId(TOPIC_ID)
+                .setPartitions(List.of(new 
ReadShareGroupStateSummaryRequestData.PartitionData()
+                    .setPartition(partition)))));
+
         
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup 
responseData = new 
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup()
             .setGroupId("share-group-id")
             .setTopics(
@@ -4023,8 +4087,8 @@ public class GroupCoordinatorServiceTest {
                     .setTopicId(TOPIC_ID)
                     .setPartitions(List.of(new 
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition()
                         .setPartitionIndex(partition)
-                        .setErrorCode(Errors.forException(ex).code())
-                        .setErrorMessage(ex.getMessage())
+                        .setErrorCode(Errors.NETWORK_EXCEPTION.code())
+                        .setErrorMessage(Errors.NETWORK_EXCEPTION.message())
                     ))
                 )
             );
@@ -4053,6 +4117,77 @@ public class GroupCoordinatorServiceTest {
         assertEquals(responseData, future.get());
     }
 
+    @Test
+    public void testDescribeShareGroupAllOffsetsLatestOffsetError() throws 
InterruptedException, ExecutionException {
+        CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = 
mockRuntime();
+        Persister persister = mock(DefaultStatePersister.class);
+
+        PartitionMetadataClient partitionMetadataClient = 
mock(PartitionMetadataClient.class);
+
+        GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
+            .setConfig(createConfig())
+            .setRuntime(runtime)
+            .setPersister(persister)
+            .setPartitionMetadataClient(partitionMetadataClient)
+            .build(true);
+
+        Exception ex = new UnknownServerException("failure");
+
+        Set<TopicPartition> partitionsToComputeLag = new HashSet<>(Set.of(new 
TopicPartition(TOPIC_NAME, 1)));
+        when(partitionMetadataClient.listLatestOffsets(partitionsToComputeLag))
+            .thenReturn(Map.of(new TopicPartition(TOPIC_NAME, 1), 
CompletableFuture.failedFuture(ex)));
+
+        MetadataImage image = new MetadataImageBuilder()
+            .addTopic(TOPIC_ID, TOPIC_NAME, 3)
+            .build();
+
+        service.onNewMetadataImage(new KRaftCoordinatorMetadataImage(image), 
null);
+
+        int partition = 1;
+
+        when(runtime.scheduleReadOperation(
+            ArgumentMatchers.eq("share-group-initialized-partitions"),
+            ArgumentMatchers.any(),
+            ArgumentMatchers.any()
+        )).thenReturn(CompletableFuture.completedFuture(Map.of(TOPIC_ID, 
Set.of(partition))));
+
+        
DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestGroup 
requestData = new 
DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestGroup()
+            .setGroupId("share-group-id")
+            .setTopics(null);
+
+        ReadShareGroupStateSummaryRequestData 
readShareGroupStateSummaryRequestData = new 
ReadShareGroupStateSummaryRequestData()
+            .setGroupId("share-group-id")
+            .setTopics(List.of(new 
ReadShareGroupStateSummaryRequestData.ReadStateSummaryData()
+                .setTopicId(TOPIC_ID)
+                .setPartitions(List.of(new 
ReadShareGroupStateSummaryRequestData.PartitionData()
+                    .setPartition(partition)))));
+
+        ReadShareGroupStateSummaryResponseData 
readShareGroupStateSummaryResponseData = new 
ReadShareGroupStateSummaryResponseData()
+            .setResults(
+                List.of(new 
ReadShareGroupStateSummaryResponseData.ReadStateSummaryResult()
+                    .setTopicId(TOPIC_ID)
+                    .setPartitions(List.of(new 
ReadShareGroupStateSummaryResponseData.PartitionResult()
+                        .setPartition(partition)
+                        .setStartOffset(21)
+                        .setDeliveryCompleteCount(10)
+                        .setStateEpoch(1)))
+                )
+            );
+
+        ReadShareGroupStateSummaryParameters 
readShareGroupStateSummaryParameters = 
ReadShareGroupStateSummaryParameters.from(readShareGroupStateSummaryRequestData);
+        ReadShareGroupStateSummaryResult readShareGroupStateSummaryResult = 
ReadShareGroupStateSummaryResult.from(readShareGroupStateSummaryResponseData);
+        when(persister.readSummary(
+            ArgumentMatchers.eq(readShareGroupStateSummaryParameters)
+        
)).thenReturn(CompletableFuture.completedFuture(readShareGroupStateSummaryResult));
+
+        
CompletableFuture<DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup>
 future =
+            
service.describeShareGroupAllOffsets(requestContext(ApiKeys.DESCRIBE_SHARE_GROUP_OFFSETS),
 requestData);
+
+        CompletionException responseException = 
assertThrows(CompletionException.class, future::join);
+        assertInstanceOf(UnknownServerException.class, 
responseException.getCause());
+        assertEquals("failure", responseException.getCause().getMessage());
+    }
+
     @Test
     public void testDescribeShareGroupOffsetsCoordinatorNotActive() throws 
ExecutionException, InterruptedException {
         CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = 
mockRuntime();
@@ -4126,7 +4261,8 @@ public class GroupCoordinatorServiceTest {
 
         Set<TopicPartition> partitionsToComputeLag = new HashSet<>(Set.of(new 
TopicPartition(TOPIC_NAME, 1)));
         when(partitionMetadataClient.listLatestOffsets(partitionsToComputeLag))
-            .thenReturn(Map.of(new TopicPartition(TOPIC_NAME, 1), 
CompletableFuture.completedFuture(41L)));
+            .thenReturn(Map.of(new TopicPartition(TOPIC_NAME, 1),
+                CompletableFuture.completedFuture(new 
PartitionMetadataClient.OffsetResponse(41L, Errors.NONE))));
 
         MetadataImage image = new MetadataImageBuilder()
             .addTopic(TOPIC_ID, TOPIC_NAME, 3)
@@ -4334,89 +4470,6 @@ public class GroupCoordinatorServiceTest {
         assertEquals(responseData, future.get());
     }
 
-    @Test
-    public void testDescribeShareGroupAllOffsetsLatestOffsetError() throws 
InterruptedException, ExecutionException {
-        CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = 
mockRuntime();
-        Persister persister = mock(DefaultStatePersister.class);
-
-        PartitionMetadataClient partitionMetadataClient = 
mock(PartitionMetadataClient.class);
-
-        GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
-            .setConfig(createConfig())
-            .setRuntime(runtime)
-            .setPersister(persister)
-            .setPartitionMetadataClient(partitionMetadataClient)
-            .build(true);
-
-        Exception ex = new Exception("failure");
-
-        Set<TopicPartition> partitionsToComputeLag = new HashSet<>(Set.of(new 
TopicPartition(TOPIC_NAME, 1)));
-        when(partitionMetadataClient.listLatestOffsets(partitionsToComputeLag))
-            .thenReturn(Map.of(new TopicPartition(TOPIC_NAME, 1), 
CompletableFuture.failedFuture(ex)));
-
-        MetadataImage image = new MetadataImageBuilder()
-            .addTopic(TOPIC_ID, TOPIC_NAME, 3)
-            .build();
-
-        service.onNewMetadataImage(new KRaftCoordinatorMetadataImage(image), 
null);
-
-        int partition = 1;
-
-        when(runtime.scheduleReadOperation(
-            ArgumentMatchers.eq("share-group-initialized-partitions"),
-            ArgumentMatchers.any(),
-            ArgumentMatchers.any()
-        )).thenReturn(CompletableFuture.completedFuture(Map.of(TOPIC_ID, 
Set.of(partition))));
-
-        
DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestGroup 
requestData = new 
DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestGroup()
-            .setGroupId("share-group-id")
-            .setTopics(null);
-
-        ReadShareGroupStateSummaryRequestData 
readShareGroupStateSummaryRequestData = new 
ReadShareGroupStateSummaryRequestData()
-            .setGroupId("share-group-id")
-            .setTopics(List.of(new 
ReadShareGroupStateSummaryRequestData.ReadStateSummaryData()
-                .setTopicId(TOPIC_ID)
-                .setPartitions(List.of(new 
ReadShareGroupStateSummaryRequestData.PartitionData()
-                    .setPartition(partition)))));
-
-        
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup 
responseData = new 
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup()
-            .setGroupId("share-group-id")
-            .setTopics(
-                List.of(new 
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic()
-                    .setTopicName(TOPIC_NAME)
-                    .setTopicId(TOPIC_ID)
-                    .setPartitions(List.of(new 
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition()
-                        .setPartitionIndex(partition)
-                        .setErrorCode(Errors.forException(ex).code())
-                        .setErrorMessage(ex.getMessage())
-                    ))
-                )
-            );
-
-        ReadShareGroupStateSummaryResponseData 
readShareGroupStateSummaryResponseData = new 
ReadShareGroupStateSummaryResponseData()
-            .setResults(
-                List.of(new 
ReadShareGroupStateSummaryResponseData.ReadStateSummaryResult()
-                    .setTopicId(TOPIC_ID)
-                    .setPartitions(List.of(new 
ReadShareGroupStateSummaryResponseData.PartitionResult()
-                        .setPartition(partition)
-                        .setStartOffset(21)
-                        .setDeliveryCompleteCount(10)
-                        .setStateEpoch(1)))
-                )
-            );
-
-        ReadShareGroupStateSummaryParameters 
readShareGroupStateSummaryParameters = 
ReadShareGroupStateSummaryParameters.from(readShareGroupStateSummaryRequestData);
-        ReadShareGroupStateSummaryResult readShareGroupStateSummaryResult = 
ReadShareGroupStateSummaryResult.from(readShareGroupStateSummaryResponseData);
-        when(persister.readSummary(
-            ArgumentMatchers.eq(readShareGroupStateSummaryParameters)
-        
)).thenReturn(CompletableFuture.completedFuture(readShareGroupStateSummaryResult));
-
-        
CompletableFuture<DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup>
 future =
-            
service.describeShareGroupAllOffsets(requestContext(ApiKeys.DESCRIBE_SHARE_GROUP_OFFSETS),
 requestData);
-
-        assertEquals(responseData, future.get());
-    }
-
     @Test
     public void testDescribeShareGroupAllOffsetsCoordinatorNotActive() throws 
ExecutionException, InterruptedException {
         CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = 
mockRuntime();
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/NetworkPartitionMetadataClientTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/NetworkPartitionMetadataClientTest.java
new file mode 100644
index 00000000000..49e4bf8c783
--- /dev/null
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/NetworkPartitionMetadataClientTest.java
@@ -0,0 +1,972 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.KafkaClient;
+import org.apache.kafka.clients.MockClient;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.AuthenticationException;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
+import org.apache.kafka.common.message.ListOffsetsRequestData.ListOffsetsTopic;
+import 
org.apache.kafka.common.message.ListOffsetsResponseData.ListOffsetsPartitionResponse;
+import 
org.apache.kafka.common.message.ListOffsetsResponseData.ListOffsetsTopicResponse;
+import org.apache.kafka.common.network.ListenerName;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ListOffsetsRequest;
+import org.apache.kafka.common.requests.ListOffsetsResponse;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
+import org.apache.kafka.metadata.MetadataCache;
+import org.apache.kafka.server.util.MockTime;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.function.Supplier;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+class NetworkPartitionMetadataClientTest {
+    private static final MockTime MOCK_TIME = new MockTime();
+    private static final MetadataCache METADATA_CACHE = 
mock(MetadataCache.class);
+    private static final Supplier<KafkaClient> KAFKA_CLIENT_SUPPLIER = () -> 
mock(KafkaClient.class);
+    private static final String HOST = "localhost";
+    private static final int PORT = 9092;
+    private static final ListenerName LISTENER_NAME = 
ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT);
+    private static final String TOPIC = "test-topic";
+    private static final int PARTITION = 0;
+    private static final Node LEADER_NODE = new Node(1, HOST, PORT);
+
+    private NetworkPartitionMetadataClient networkPartitionMetadataClient;
+
+    private static class NetworkPartitionMetadataClientBuilder {
+        private MetadataCache metadataCache = METADATA_CACHE;
+        private Supplier<KafkaClient> kafkaClientSupplier = 
KAFKA_CLIENT_SUPPLIER;
+
+        NetworkPartitionMetadataClientBuilder withMetadataCache(MetadataCache 
metadataCache) {
+            this.metadataCache = metadataCache;
+            return this;
+        }
+
+        NetworkPartitionMetadataClientBuilder 
withKafkaClientSupplier(Supplier<KafkaClient> kafkaClientSupplier) {
+            this.kafkaClientSupplier = kafkaClientSupplier;
+            return this;
+        }
+
+        static NetworkPartitionMetadataClientBuilder builder() {
+            return new NetworkPartitionMetadataClientBuilder();
+        }
+
+        NetworkPartitionMetadataClient build() {
+            return new NetworkPartitionMetadataClient(metadataCache, 
kafkaClientSupplier, MOCK_TIME, LISTENER_NAME);
+        }
+    }
+
+    @BeforeEach
+    public void setUp() {
+        networkPartitionMetadataClient = null;
+    }
+
+    @AfterEach
+    public void tearDown() {
+        if (networkPartitionMetadataClient != null) {
+            try {
+                networkPartitionMetadataClient.close();
+            } catch (Exception e) {
+                fail("Failed to close NetworkPartitionMetadataClient", e);
+            }
+        }
+    }
+
+    @Test
+    public void testListLatestOffsetsSuccess() throws ExecutionException, 
InterruptedException {
+        TopicPartition tp = new TopicPartition(TOPIC, PARTITION);
+        long expectedOffset = 100L;
+        MetadataCache metadataCache = mock(MetadataCache.class);
+        MockClient client = new MockClient(MOCK_TIME);
+
+        // Mock metadata cache to return leader node
+        when(metadataCache.getPartitionLeaderEndpoint(TOPIC, PARTITION, 
LISTENER_NAME))
+            .thenReturn(Optional.of(LEADER_NODE));
+
+        // Prepare response for ListOffsets request
+        client.prepareResponseFrom(body -> {
+            if (body instanceof ListOffsetsRequest request) {
+                ListOffsetsTopic requestTopic = request.data().topics().get(0);
+                return requestTopic.name().equals(TOPIC) &&
+                    requestTopic.partitions().get(0).partitionIndex() == 
PARTITION;
+            }
+            return false;
+        }, new ListOffsetsResponse(
+            new org.apache.kafka.common.message.ListOffsetsResponseData()
+                .setTopics(List.of(
+                    new ListOffsetsTopicResponse()
+                        .setName(TOPIC)
+                        .setPartitions(List.of(
+                            new ListOffsetsPartitionResponse()
+                                .setPartitionIndex(PARTITION)
+                                .setErrorCode(Errors.NONE.code())
+                                .setOffset(expectedOffset)
+                                .setTimestamp(System.currentTimeMillis())
+                        ))
+                ))
+        ), LEADER_NODE);
+
+        networkPartitionMetadataClient = 
NetworkPartitionMetadataClientBuilder.builder()
+            .withMetadataCache(metadataCache)
+            .withKafkaClientSupplier(() -> client)
+            .build();
+
+        Set<TopicPartition> partitions = new HashSet<>();
+        partitions.add(tp);
+
+        Map<TopicPartition, 
CompletableFuture<PartitionMetadataClient.OffsetResponse>> futures =
+            networkPartitionMetadataClient.listLatestOffsets(partitions);
+
+        assertNotNull(futures);
+        assertEquals(1, futures.size());
+        assertTrue(futures.containsKey(tp));
+
+        PartitionMetadataClient.OffsetResponse response = 
futures.get(tp).get();
+        assertTrue(futures.get(tp).isDone() && 
!futures.get(tp).isCompletedExceptionally());
+        assertNotNull(response);
+        assertEquals(Errors.NONE.code(), response.error().code());
+        assertEquals(expectedOffset, response.offset());
+    }
+
+    @Test
+    public void testListLatestOffsetsEmptyPartitionLeader() throws 
ExecutionException, InterruptedException {
+        TopicPartition tp = new TopicPartition(TOPIC, PARTITION);
+        MetadataCache metadataCache = mock(MetadataCache.class);
+
+        // Mock metadata cache to return leader node
+        when(metadataCache.getPartitionLeaderEndpoint(TOPIC, PARTITION, 
LISTENER_NAME))
+            .thenReturn(Optional.empty());
+
+        networkPartitionMetadataClient = 
NetworkPartitionMetadataClientBuilder.builder()
+            .withMetadataCache(metadataCache)
+            .build();
+
+        Set<TopicPartition> partitions = new HashSet<>();
+        partitions.add(tp);
+
+        Map<TopicPartition, 
CompletableFuture<PartitionMetadataClient.OffsetResponse>> futures =
+            networkPartitionMetadataClient.listLatestOffsets(partitions);
+
+        assertNotNull(futures);
+        assertEquals(1, futures.size());
+        assertTrue(futures.containsKey(tp));
+
+        PartitionMetadataClient.OffsetResponse response = 
futures.get(tp).get();
+        assertTrue(futures.get(tp).isDone() && 
!futures.get(tp).isCompletedExceptionally());
+        assertNotNull(response);
+        assertEquals(Errors.LEADER_NOT_AVAILABLE.code(), 
response.error().code());
+    }
+
+    @Test
+    public void testListLatestOffsetsNoNodePartitionLeader() throws 
ExecutionException, InterruptedException {
+        TopicPartition tp = new TopicPartition(TOPIC, PARTITION);
+        MetadataCache metadataCache = mock(MetadataCache.class);
+
+        // Mock metadata cache to return leader node
+        when(metadataCache.getPartitionLeaderEndpoint(TOPIC, PARTITION, 
LISTENER_NAME))
+            .thenReturn(Optional.of(Node.noNode()));
+
+        networkPartitionMetadataClient = 
NetworkPartitionMetadataClientBuilder.builder()
+            .withMetadataCache(metadataCache)
+            .build();
+
+        Set<TopicPartition> partitions = new HashSet<>();
+        partitions.add(tp);
+
+        Map<TopicPartition, 
CompletableFuture<PartitionMetadataClient.OffsetResponse>> futures =
+            networkPartitionMetadataClient.listLatestOffsets(partitions);
+
+        assertNotNull(futures);
+        assertEquals(1, futures.size());
+        assertTrue(futures.containsKey(tp));
+
+        PartitionMetadataClient.OffsetResponse response = 
futures.get(tp).get();
+        assertTrue(futures.get(tp).isDone() && 
!futures.get(tp).isCompletedExceptionally());
+        assertNotNull(response);
+        assertEquals(Errors.LEADER_NOT_AVAILABLE.code(), 
response.error().code());
+    }
+
+    @Test
+    public void testListLatestOffsetsNullResponseBody() throws 
ExecutionException, InterruptedException {
+        TopicPartition tp = new TopicPartition(TOPIC, PARTITION);
+        MetadataCache metadataCache = mock(MetadataCache.class);
+        MockClient client = new MockClient(MOCK_TIME);
+
+        // Mock metadata cache to return leader node
+        when(metadataCache.getPartitionLeaderEndpoint(TOPIC, PARTITION, 
LISTENER_NAME))
+            .thenReturn(Optional.of(LEADER_NODE));
+
+        // Prepare null response
+        client.prepareResponseFrom(body -> {
+            if (body instanceof ListOffsetsRequest) {
+                ListOffsetsRequest request = (ListOffsetsRequest) body;
+                ListOffsetsTopic requestTopic = request.data().topics().get(0);
+                return requestTopic.name().equals(TOPIC) &&
+                    requestTopic.partitions().get(0).partitionIndex() == 
PARTITION;
+            }
+            return false;
+        }, null, LEADER_NODE);
+
+        networkPartitionMetadataClient = 
NetworkPartitionMetadataClientBuilder.builder()
+            .withMetadataCache(metadataCache)
+            .withKafkaClientSupplier(() -> client)
+            .build();
+
+        Set<TopicPartition> partitions = new HashSet<>();
+        partitions.add(tp);
+
+        Map<TopicPartition, 
CompletableFuture<PartitionMetadataClient.OffsetResponse>> futures =
+            networkPartitionMetadataClient.listLatestOffsets(partitions);
+
+        assertNotNull(futures);
+        assertEquals(1, futures.size());
+        assertTrue(futures.containsKey(tp));
+
+        PartitionMetadataClient.OffsetResponse response = 
futures.get(tp).get();
+        assertTrue(futures.get(tp).isDone() && 
!futures.get(tp).isCompletedExceptionally());
+        assertNotNull(response);
+        assertEquals(Errors.UNKNOWN_SERVER_ERROR.code(), 
response.error().code());
+    }
+
+    @Test
+    public void testListLatestOffsetsNullResponse() throws ExecutionException, 
InterruptedException {
+        TopicPartition tp = new TopicPartition(TOPIC, PARTITION);
+        CompletableFuture<PartitionMetadataClient.OffsetResponse> 
partitionFuture = new CompletableFuture<>();
+        Map<TopicPartition, 
CompletableFuture<PartitionMetadataClient.OffsetResponse>> futures = Map.of(
+            tp,
+            partitionFuture
+        );
+        networkPartitionMetadataClient = 
NetworkPartitionMetadataClientBuilder.builder().build();
+        // Pass null as clientResponse.
+        networkPartitionMetadataClient.handleResponse(futures, null);
+        assertTrue(partitionFuture.isDone() && 
!partitionFuture.isCompletedExceptionally());
+        PartitionMetadataClient.OffsetResponse response = 
partitionFuture.get();
+        assertEquals(-1, response.offset());
+        assertEquals(Errors.UNKNOWN_SERVER_ERROR.code(), 
response.error().code());
+    }
+
+    @Test
+    public void testListLatestOffsetsAuthenticationError() throws 
ExecutionException, InterruptedException {
+        TopicPartition tp = new TopicPartition(TOPIC, PARTITION);
+        CompletableFuture<PartitionMetadataClient.OffsetResponse> 
partitionFuture = new CompletableFuture<>();
+        Map<TopicPartition, 
CompletableFuture<PartitionMetadataClient.OffsetResponse>> futures = Map.of(
+            tp,
+            partitionFuture
+        );
+        AuthenticationException authenticationException = new 
AuthenticationException("Test authentication exception");
+        ClientResponse clientResponse = mock(ClientResponse.class);
+        // Mock authentication exception in client response.
+        
when(clientResponse.authenticationException()).thenReturn(authenticationException);
+        networkPartitionMetadataClient = 
NetworkPartitionMetadataClientBuilder.builder().build();
+        networkPartitionMetadataClient.handleResponse(futures, clientResponse);
+        assertTrue(partitionFuture.isDone() && 
!partitionFuture.isCompletedExceptionally());
+        PartitionMetadataClient.OffsetResponse response = 
partitionFuture.get();
+        assertEquals(-1, response.offset());
+        assertEquals(Errors.UNKNOWN_SERVER_ERROR.code(), 
response.error().code());
+    }
+
+    @Test
+    public void testListLatestOffsetsVersionMismatch() throws 
ExecutionException, InterruptedException {
+        TopicPartition tp = new TopicPartition(TOPIC, PARTITION);
+        CompletableFuture<PartitionMetadataClient.OffsetResponse> 
partitionFuture = new CompletableFuture<>();
+        Map<TopicPartition, 
CompletableFuture<PartitionMetadataClient.OffsetResponse>> futures = Map.of(
+            tp,
+            partitionFuture
+        );
+        UnsupportedVersionException unsupportedVersionException = new 
UnsupportedVersionException("Test unsupportedVersionException exception");
+        ClientResponse clientResponse = mock(ClientResponse.class);
+        when(clientResponse.authenticationException()).thenReturn(null);
+        // Mock version mismatch exception in client response.
+        
when(clientResponse.versionMismatch()).thenReturn(unsupportedVersionException);
+        networkPartitionMetadataClient = 
NetworkPartitionMetadataClientBuilder.builder().build();
+        networkPartitionMetadataClient.handleResponse(futures, clientResponse);
+        assertTrue(partitionFuture.isDone() && 
!partitionFuture.isCompletedExceptionally());
+        PartitionMetadataClient.OffsetResponse response = 
partitionFuture.get();
+        assertEquals(-1, response.offset());
+        assertEquals(Errors.UNKNOWN_SERVER_ERROR.code(), 
response.error().code());
+    }
+
+    @Test
+    public void testListLatestOffsetsDisconnected() throws ExecutionException, 
InterruptedException {
+        TopicPartition tp = new TopicPartition(TOPIC, PARTITION);
+        CompletableFuture<PartitionMetadataClient.OffsetResponse> 
partitionFuture = new CompletableFuture<>();
+        Map<TopicPartition, 
CompletableFuture<PartitionMetadataClient.OffsetResponse>> futures = Map.of(
+            tp,
+            partitionFuture
+        );
+        ClientResponse clientResponse = mock(ClientResponse.class);
+        when(clientResponse.authenticationException()).thenReturn(null);
+        when(clientResponse.versionMismatch()).thenReturn(null);
+        // Mock disconnected in client response.
+        when(clientResponse.wasDisconnected()).thenReturn(true);
+        networkPartitionMetadataClient = 
NetworkPartitionMetadataClientBuilder.builder().build();
+        networkPartitionMetadataClient.handleResponse(futures, clientResponse);
+        assertTrue(partitionFuture.isDone() && 
!partitionFuture.isCompletedExceptionally());
+        PartitionMetadataClient.OffsetResponse response = 
partitionFuture.get();
+        assertEquals(-1, response.offset());
+        assertEquals(Errors.NETWORK_EXCEPTION.code(), response.error().code());
+    }
+
+    @Test
+    public void testListLatestOffsetsTimedOut() throws ExecutionException, 
InterruptedException {
+        TopicPartition tp = new TopicPartition(TOPIC, PARTITION);
+        CompletableFuture<PartitionMetadataClient.OffsetResponse> 
partitionFuture = new CompletableFuture<>();
+        Map<TopicPartition, 
CompletableFuture<PartitionMetadataClient.OffsetResponse>> futures = Map.of(
+            tp,
+            partitionFuture
+        );
+        ClientResponse clientResponse = mock(ClientResponse.class);
+        when(clientResponse.authenticationException()).thenReturn(null);
+        when(clientResponse.versionMismatch()).thenReturn(null);
+        when(clientResponse.wasDisconnected()).thenReturn(false);
+        // Mock timed out in client response.
+        when(clientResponse.wasTimedOut()).thenReturn(true);
+        networkPartitionMetadataClient = 
NetworkPartitionMetadataClientBuilder.builder().build();
+        networkPartitionMetadataClient.handleResponse(futures, clientResponse);
+        assertTrue(partitionFuture.isDone() && 
!partitionFuture.isCompletedExceptionally());
+        PartitionMetadataClient.OffsetResponse response = 
partitionFuture.get();
+        assertEquals(-1, response.offset());
+        assertEquals(Errors.REQUEST_TIMED_OUT.code(), response.error().code());
+    }
+
+    @Test
+    public void testListLatestOffsetsMultiplePartitionsSameLeader() throws 
ExecutionException, InterruptedException {
+        TopicPartition tp1 = new TopicPartition(TOPIC, PARTITION);
+        TopicPartition tp2 = new TopicPartition(TOPIC, 1);
+        long expectedOffset1 = 100L;
+        long expectedOffset2 = 200L;
+
+        MetadataCache metadataCache = mock(MetadataCache.class);
+        MockClient client = new MockClient(MOCK_TIME);
+
+        // Mock metadata cache to return same leader for both partitions
+        when(metadataCache.getPartitionLeaderEndpoint(TOPIC, 0, LISTENER_NAME))
+            .thenReturn(Optional.of(LEADER_NODE));
+        when(metadataCache.getPartitionLeaderEndpoint(TOPIC, 1, LISTENER_NAME))
+            .thenReturn(Optional.of(LEADER_NODE));
+
+        // Prepare response for ListOffsets request with both partitions
+        client.prepareResponseFrom(body -> {
+            if (body instanceof ListOffsetsRequest) {
+                ListOffsetsRequest request = (ListOffsetsRequest) body;
+                ListOffsetsTopic requestTopic = request.data().topics().get(0);
+                return requestTopic.name().equals(TOPIC) &&
+                    requestTopic.partitions().size() == 2;
+            }
+            return false;
+        }, new ListOffsetsResponse(
+            new org.apache.kafka.common.message.ListOffsetsResponseData()
+                .setTopics(List.of(
+                    new ListOffsetsTopicResponse()
+                        .setName(TOPIC)
+                        .setPartitions(List.of(
+                            new ListOffsetsPartitionResponse()
+                                .setPartitionIndex(0)
+                                .setErrorCode(Errors.NONE.code())
+                                .setOffset(expectedOffset1)
+                                .setTimestamp(System.currentTimeMillis()),
+                            new ListOffsetsPartitionResponse()
+                                .setPartitionIndex(1)
+                                .setErrorCode(Errors.NONE.code())
+                                .setOffset(expectedOffset2)
+                                .setTimestamp(System.currentTimeMillis())
+                        ))
+                ))
+        ), LEADER_NODE);
+
+        networkPartitionMetadataClient = 
NetworkPartitionMetadataClientBuilder.builder()
+            .withMetadataCache(metadataCache)
+            .withKafkaClientSupplier(() -> client)
+            .build();
+
+        Set<TopicPartition> partitions = new HashSet<>();
+        partitions.add(tp1);
+        partitions.add(tp2);
+
+        Map<TopicPartition, 
CompletableFuture<PartitionMetadataClient.OffsetResponse>> futures =
+            networkPartitionMetadataClient.listLatestOffsets(partitions);
+
+        assertNotNull(futures);
+        assertEquals(2, futures.size());
+        assertTrue(futures.containsKey(tp1));
+        assertTrue(futures.containsKey(tp2));
+
+        PartitionMetadataClient.OffsetResponse response1 = 
futures.get(tp1).get();
+        assertTrue(futures.get(tp1).isDone() && 
!futures.get(tp1).isCompletedExceptionally());
+        assertNotNull(response1);
+        assertEquals(Errors.NONE.code(), response1.error().code());
+        assertEquals(expectedOffset1, response1.offset());
+
+        PartitionMetadataClient.OffsetResponse response2 = 
futures.get(tp2).get();
+        assertTrue(futures.get(tp2).isDone() && 
!futures.get(tp2).isCompletedExceptionally());
+        assertNotNull(response2);
+        assertEquals(Errors.NONE.code(), response2.error().code());
+        assertEquals(expectedOffset2, response2.offset());
+    }
+
+    @Test
+    public void testListLatestOffsetsMultiplePartitionsDifferentLeaders() 
throws ExecutionException, InterruptedException {
+        TopicPartition tp1 = new TopicPartition(TOPIC, 0);
+        TopicPartition tp2 = new TopicPartition(TOPIC, 1);
+        Node leaderNode1 = LEADER_NODE;
+        Node leaderNode2 = new Node(2, HOST, PORT + 1);
+        long expectedOffset1 = 100L;
+        long expectedOffset2 = 200L;
+
+        MetadataCache metadataCache = mock(MetadataCache.class);
+        MockClient client = new MockClient(MOCK_TIME);
+
+        // Mock metadata cache to return different leaders
+        when(metadataCache.getPartitionLeaderEndpoint(TOPIC, 0, LISTENER_NAME))
+            .thenReturn(Optional.of(leaderNode1));
+        when(metadataCache.getPartitionLeaderEndpoint(TOPIC, 1, LISTENER_NAME))
+            .thenReturn(Optional.of(leaderNode2));
+
+        // Prepare response for first leader
+        client.prepareResponseFrom(body -> {
+            if (body instanceof ListOffsetsRequest) {
+                ListOffsetsRequest request = (ListOffsetsRequest) body;
+                ListOffsetsTopic requestTopic = request.data().topics().get(0);
+                return requestTopic.name().equals(TOPIC) &&
+                    requestTopic.partitions().size() == 1 &&
+                    requestTopic.partitions().get(0).partitionIndex() == 0;
+            }
+            return false;
+        }, new ListOffsetsResponse(
+            new org.apache.kafka.common.message.ListOffsetsResponseData()
+                .setTopics(List.of(
+                    new ListOffsetsTopicResponse()
+                        .setName(TOPIC)
+                        .setPartitions(List.of(
+                            new ListOffsetsPartitionResponse()
+                                .setPartitionIndex(0)
+                                .setErrorCode(Errors.NONE.code())
+                                .setOffset(expectedOffset1)
+                                .setTimestamp(System.currentTimeMillis())
+                        ))
+                ))
+        ), leaderNode1);
+
+        // Prepare response for second leader
+        client.prepareResponseFrom(body -> {
+            if (body instanceof ListOffsetsRequest) {
+                ListOffsetsRequest request = (ListOffsetsRequest) body;
+                ListOffsetsTopic requestTopic = request.data().topics().get(0);
+                return requestTopic.name().equals(TOPIC) &&
+                    requestTopic.partitions().size() == 1 &&
+                    requestTopic.partitions().get(0).partitionIndex() == 1;
+            }
+            return false;
+        }, new ListOffsetsResponse(
+            new org.apache.kafka.common.message.ListOffsetsResponseData()
+                .setTopics(List.of(
+                    new ListOffsetsTopicResponse()
+                        .setName(TOPIC)
+                        .setPartitions(List.of(
+                            new ListOffsetsPartitionResponse()
+                                .setPartitionIndex(1)
+                                .setErrorCode(Errors.NONE.code())
+                                .setOffset(expectedOffset2)
+                                .setTimestamp(System.currentTimeMillis())
+                        ))
+                ))
+        ), leaderNode2);
+
+        networkPartitionMetadataClient = 
NetworkPartitionMetadataClientBuilder.builder()
+            .withMetadataCache(metadataCache)
+            .withKafkaClientSupplier(() -> client)
+            .build();
+
+        Set<TopicPartition> partitions = new HashSet<>();
+        partitions.add(tp1);
+        partitions.add(tp2);
+
+        Map<TopicPartition, 
CompletableFuture<PartitionMetadataClient.OffsetResponse>> futures =
+            networkPartitionMetadataClient.listLatestOffsets(partitions);
+
+        assertNotNull(futures);
+        assertEquals(2, futures.size());
+        assertTrue(futures.containsKey(tp1));
+        assertTrue(futures.containsKey(tp2));
+
+        PartitionMetadataClient.OffsetResponse response1 = 
futures.get(tp1).get();
+        assertTrue(futures.get(tp1).isDone() && 
!futures.get(tp1).isCompletedExceptionally());
+        assertNotNull(response1);
+        assertEquals(Errors.NONE.code(), response1.error().code());
+        assertEquals(expectedOffset1, response1.offset());
+
+        PartitionMetadataClient.OffsetResponse response2 = 
futures.get(tp2).get();
+        assertTrue(futures.get(tp2).isDone() && 
!futures.get(tp2).isCompletedExceptionally());
+        assertNotNull(response2);
+        assertEquals(Errors.NONE.code(), response2.error().code());
+        assertEquals(expectedOffset2, response2.offset());
+    }
+
+    @Test
+    public void testListLatestOffsetsMultipleTopics() throws 
ExecutionException, InterruptedException {
+        String topic1 = TOPIC;
+        String topic2 = "test-topic-2";
+        TopicPartition tp1 = new TopicPartition(topic1, 0);
+        TopicPartition tp2 = new TopicPartition(topic2, 0);
+        long expectedOffset1 = 100L;
+        long expectedOffset2 = 200L;
+
+        MetadataCache metadataCache = mock(MetadataCache.class);
+        MockClient client = new MockClient(MOCK_TIME);
+
+        // Mock metadata cache to return same leader for both topics
+        when(metadataCache.getPartitionLeaderEndpoint(topic1, 0, 
LISTENER_NAME))
+            .thenReturn(Optional.of(LEADER_NODE));
+        when(metadataCache.getPartitionLeaderEndpoint(topic2, 0, 
LISTENER_NAME))
+            .thenReturn(Optional.of(LEADER_NODE));
+
+        // Prepare response for ListOffsets request with both topics
+        client.prepareResponseFrom(body -> {
+            if (body instanceof ListOffsetsRequest) {
+                ListOffsetsRequest request = (ListOffsetsRequest) body;
+                return request.data().topics().size() == 2;
+            }
+            return false;
+        }, new ListOffsetsResponse(
+            new org.apache.kafka.common.message.ListOffsetsResponseData()
+                .setTopics(List.of(
+                    new ListOffsetsTopicResponse()
+                        .setName(topic1)
+                        .setPartitions(List.of(
+                            new ListOffsetsPartitionResponse()
+                                .setPartitionIndex(0)
+                                .setErrorCode(Errors.NONE.code())
+                                .setOffset(expectedOffset1)
+                                .setTimestamp(System.currentTimeMillis())
+                        )),
+                    new ListOffsetsTopicResponse()
+                        .setName(topic2)
+                        .setPartitions(List.of(
+                            new ListOffsetsPartitionResponse()
+                                .setPartitionIndex(0)
+                                .setErrorCode(Errors.NONE.code())
+                                .setOffset(expectedOffset2)
+                                .setTimestamp(System.currentTimeMillis())
+                        ))
+                ))
+        ), LEADER_NODE);
+
+        networkPartitionMetadataClient = 
NetworkPartitionMetadataClientBuilder.builder()
+            .withMetadataCache(metadataCache)
+            .withKafkaClientSupplier(() -> client)
+            .build();
+
+        Set<TopicPartition> partitions = new HashSet<>();
+        partitions.add(tp1);
+        partitions.add(tp2);
+
+        Map<TopicPartition, 
CompletableFuture<PartitionMetadataClient.OffsetResponse>> futures =
+            networkPartitionMetadataClient.listLatestOffsets(partitions);
+
+        assertNotNull(futures);
+        assertEquals(2, futures.size());
+        assertTrue(futures.containsKey(tp1));
+        assertTrue(futures.containsKey(tp2));
+
+        PartitionMetadataClient.OffsetResponse response1 = 
futures.get(tp1).get();
+        assertTrue(futures.get(tp1).isDone() && 
!futures.get(tp1).isCompletedExceptionally());
+        assertNotNull(response1);
+        assertEquals(Errors.NONE.code(), response1.error().code());
+        assertEquals(expectedOffset1, response1.offset());
+
+        PartitionMetadataClient.OffsetResponse response2 = 
futures.get(tp2).get();
+        assertTrue(futures.get(tp2).isDone() && 
!futures.get(tp2).isCompletedExceptionally());
+        assertNotNull(response2);
+        assertEquals(Errors.NONE.code(), response2.error().code());
+        assertEquals(expectedOffset2, response2.offset());
+    }
+
+    @Test
+    public void testListLatestOffsetsNullPartitions() {
+        networkPartitionMetadataClient = 
NetworkPartitionMetadataClientBuilder.builder().build();
+
+        Map<TopicPartition, 
CompletableFuture<PartitionMetadataClient.OffsetResponse>> futures =
+            networkPartitionMetadataClient.listLatestOffsets(null);
+
+        assertNotNull(futures);
+        assertTrue(futures.isEmpty());
+    }
+
+    @Test
+    public void testListLatestOffsetsEmptyPartitions() {
+        networkPartitionMetadataClient = 
NetworkPartitionMetadataClientBuilder.builder().build();
+
+        Set<TopicPartition> partitions = new HashSet<>();
+
+        Map<TopicPartition, 
CompletableFuture<PartitionMetadataClient.OffsetResponse>> futures =
+            networkPartitionMetadataClient.listLatestOffsets(partitions);
+
+        assertNotNull(futures);
+        assertTrue(futures.isEmpty());
+    }
+
+    @Test
+    public void testListLatestOffsetsServerError() throws ExecutionException, 
InterruptedException {
+        TopicPartition tp = new TopicPartition(TOPIC, PARTITION);
+        MetadataCache metadataCache = mock(MetadataCache.class);
+        MockClient client = new MockClient(MOCK_TIME);
+
+        // Mock metadata cache to return leader node
+        when(metadataCache.getPartitionLeaderEndpoint(TOPIC, PARTITION, 
LISTENER_NAME))
+            .thenReturn(Optional.of(LEADER_NODE));
+
+        // Prepare error response
+        client.prepareResponseFrom(body -> {
+            if (body instanceof ListOffsetsRequest) {
+                ListOffsetsRequest request = (ListOffsetsRequest) body;
+                ListOffsetsTopic requestTopic = request.data().topics().get(0);
+                return requestTopic.name().equals(TOPIC) &&
+                    requestTopic.partitions().get(0).partitionIndex() == 
PARTITION;
+            }
+            return false;
+        }, new ListOffsetsResponse(
+            new org.apache.kafka.common.message.ListOffsetsResponseData()
+                .setTopics(List.of(
+                    new ListOffsetsTopicResponse()
+                        .setName(TOPIC)
+                        .setPartitions(List.of(
+                            new ListOffsetsPartitionResponse()
+                                .setPartitionIndex(PARTITION)
+                                
.setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code())
+                        ))
+                ))
+        ), LEADER_NODE);
+
+        networkPartitionMetadataClient = 
NetworkPartitionMetadataClientBuilder.builder()
+            .withMetadataCache(metadataCache)
+            .withKafkaClientSupplier(() -> client)
+            .build();
+
+        Set<TopicPartition> partitions = new HashSet<>();
+        partitions.add(tp);
+
+        Map<TopicPartition, 
CompletableFuture<PartitionMetadataClient.OffsetResponse>> futures =
+            networkPartitionMetadataClient.listLatestOffsets(partitions);
+
+        assertNotNull(futures);
+        assertEquals(1, futures.size());
+        assertTrue(futures.containsKey(tp));
+
+        PartitionMetadataClient.OffsetResponse response = 
futures.get(tp).get();
+        assertTrue(futures.get(tp).isDone() && 
!futures.get(tp).isCompletedExceptionally());
+        assertNotNull(response);
+        assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION.code(), 
response.error().code());
+    }
+
+    @Test
+    public void testListLatestOffsetsMissingPartitionInResponse() throws 
ExecutionException, InterruptedException {
+        TopicPartition tp = new TopicPartition(TOPIC, PARTITION);
+        MetadataCache metadataCache = mock(MetadataCache.class);
+        MockClient client = new MockClient(MOCK_TIME);
+
+        // Mock metadata cache to return leader node
+        when(metadataCache.getPartitionLeaderEndpoint(TOPIC, PARTITION, 
LISTENER_NAME))
+            .thenReturn(Optional.of(LEADER_NODE));
+
+        // Prepare response without the requested partition
+        client.prepareResponseFrom(body -> {
+            if (body instanceof ListOffsetsRequest) {
+                ListOffsetsRequest request = (ListOffsetsRequest) body;
+                ListOffsetsTopic requestTopic = request.data().topics().get(0);
+                return requestTopic.name().equals(TOPIC) &&
+                    requestTopic.partitions().get(0).partitionIndex() == 
PARTITION;
+            }
+            return false;
+        }, new ListOffsetsResponse(
+            new org.apache.kafka.common.message.ListOffsetsResponseData()
+                .setTopics(List.of(
+                    new ListOffsetsTopicResponse()
+                        .setName(TOPIC)
+                        .setPartitions(List.of())
+                ))
+        ), LEADER_NODE);
+
+        networkPartitionMetadataClient = 
NetworkPartitionMetadataClientBuilder.builder()
+            .withMetadataCache(metadataCache)
+            .withKafkaClientSupplier(() -> client)
+            .build();
+
+        Set<TopicPartition> partitions = new HashSet<>();
+        partitions.add(tp);
+
+        Map<TopicPartition, 
CompletableFuture<PartitionMetadataClient.OffsetResponse>> futures =
+            networkPartitionMetadataClient.listLatestOffsets(partitions);
+
+        assertNotNull(futures);
+        assertEquals(1, futures.size());
+        assertTrue(futures.containsKey(tp));
+
+        PartitionMetadataClient.OffsetResponse response = 
futures.get(tp).get();
+        assertTrue(futures.get(tp).isDone() && 
!futures.get(tp).isCompletedExceptionally());
+        assertNotNull(response);
+        assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION.code(), 
response.error().code());
+    }
+
+    @Test
+    public void testClose() {
+        KafkaClient client = mock(KafkaClient.class);
+        networkPartitionMetadataClient = 
NetworkPartitionMetadataClientBuilder.builder()
+            .withKafkaClientSupplier(() -> client)
+            .build();
+        try {
+            verify(client, times(0)).close();
+            // Ensure send thread is initialized.
+            networkPartitionMetadataClient.ensureSendThreadInitialized();
+            networkPartitionMetadataClient.close();
+            // KafkaClient is closed when NetworkPartitionMetadataClient is 
closed.
+            verify(client, times(1)).close();
+        } catch (Exception e) {
+            fail("unexpected exception", e);
+        }
+    }
+
+    @Test
+    public void testLazyInitialization() {
+        TopicPartition tp = new TopicPartition(TOPIC, PARTITION);
+        long expectedOffset = 100L;
+        MetadataCache metadataCache = mock(MetadataCache.class);
+        MockClient client = new MockClient(MOCK_TIME);
+
+        // Track if client supplier was called
+        final boolean[] supplierCalled = {false};
+        Supplier<KafkaClient> kafkaClientSupplier = () -> {
+            supplierCalled[0] = true;
+            return client;
+        };
+
+        // Mock metadata cache to return leader node
+        when(metadataCache.getPartitionLeaderEndpoint(TOPIC, PARTITION, 
LISTENER_NAME))
+            .thenReturn(Optional.of(LEADER_NODE));
+
+        // Prepare response for ListOffsets request
+        client.prepareResponseFrom(body -> {
+            if (body instanceof ListOffsetsRequest request) {
+                ListOffsetsTopic requestTopic = request.data().topics().get(0);
+                return requestTopic.name().equals(TOPIC) &&
+                    requestTopic.partitions().get(0).partitionIndex() == 
PARTITION;
+            }
+            return false;
+        }, new ListOffsetsResponse(
+            new org.apache.kafka.common.message.ListOffsetsResponseData()
+                .setTopics(List.of(
+                    new ListOffsetsTopicResponse()
+                        .setName(TOPIC)
+                        .setPartitions(List.of(
+                            new ListOffsetsPartitionResponse()
+                                .setPartitionIndex(PARTITION)
+                                .setErrorCode(Errors.NONE.code())
+                                .setOffset(expectedOffset)
+                                .setTimestamp(System.currentTimeMillis())
+                        ))
+                ))
+        ), LEADER_NODE);
+
+        networkPartitionMetadataClient = 
NetworkPartitionMetadataClientBuilder.builder()
+            .withMetadataCache(metadataCache)
+            .withKafkaClientSupplier(kafkaClientSupplier)
+            .build();
+
+        // Verify supplier was not called before listLatestOffsets
+        assertFalse(supplierCalled[0]);
+
+        Set<TopicPartition> partitions = new HashSet<>();
+        partitions.add(tp);
+
+        networkPartitionMetadataClient.listLatestOffsets(partitions);
+
+        // Verify supplier was called during listLatestOffsets
+        assertTrue(supplierCalled[0]);
+    }
+
+    @Test
+    public void testCloseWithoutInitialization() throws IOException {
+        KafkaClient client = mock(KafkaClient.class);
+        final boolean[] supplierCalled = {false};
+        Supplier<KafkaClient> clientSupplier = () -> {
+            supplierCalled[0] = true;
+            return client;
+        };
+
+        networkPartitionMetadataClient = 
NetworkPartitionMetadataClientBuilder.builder()
+            .withKafkaClientSupplier(clientSupplier)
+            .build();
+
+        // Close without calling listLatestOffsets
+        networkPartitionMetadataClient.close();
+
+        // Verify supplier was never called
+        assertFalse(supplierCalled[0]);
+        // Verify client.close() was never called since sendThread was never 
initialized
+        verify(client, never()).close();
+    }
+
+    @Test
+    public void testLazyInitializationWithEmptyPartitions() {
+        MetadataCache metadataCache = mock(MetadataCache.class);
+        final boolean[] supplierCalled = {false};
+        Supplier<KafkaClient> clientSupplier = () -> {
+            supplierCalled[0] = true;
+            return mock(KafkaClient.class);
+        };
+
+        networkPartitionMetadataClient = 
NetworkPartitionMetadataClientBuilder.builder()
+            .withMetadataCache(metadataCache)
+            .withKafkaClientSupplier(clientSupplier)
+            .build();
+
+        // Call listLatestOffsets with empty partitions
+        networkPartitionMetadataClient.listLatestOffsets(new HashSet<>());
+
+        // Verify supplier was not called since no partitions were provided.
+        assertFalse(supplierCalled[0]);
+    }
+
+    @Test
+    public void testLazyInitializationWithNullPartitions() {
+        MetadataCache metadataCache = mock(MetadataCache.class);
+        final boolean[] supplierCalled = {false};
+        Supplier<KafkaClient> clientSupplier = () -> {
+            supplierCalled[0] = true;
+            return mock(KafkaClient.class);
+        };
+
+        networkPartitionMetadataClient = 
NetworkPartitionMetadataClientBuilder.builder()
+            .withMetadataCache(metadataCache)
+            .withKafkaClientSupplier(clientSupplier)
+            .build();
+
+        // Call listLatestOffsets with null partitions
+        networkPartitionMetadataClient.listLatestOffsets(null);
+
+        // Verify supplier was not called since no partitions were provided.
+        assertFalse(supplierCalled[0]);
+    }
+
+    @Test
+    public void testLazyInitializationOnlyOnce() {
+        TopicPartition tp = new TopicPartition(TOPIC, PARTITION);
+        long expectedOffset = 100L;
+        MetadataCache metadataCache = mock(MetadataCache.class);
+        MockClient client = new MockClient(MOCK_TIME);
+
+        // Track how many times supplier was called
+        final int[] supplierCallCount = {0};
+        Supplier<KafkaClient> clientSupplier = () -> {
+            supplierCallCount[0]++;
+            return client;
+        };
+
+        // Mock metadata cache to return leader node
+        when(metadataCache.getPartitionLeaderEndpoint(TOPIC, PARTITION, 
LISTENER_NAME))
+            .thenReturn(Optional.of(LEADER_NODE));
+
+        // Prepare multiple responses for multiple calls
+        client.prepareResponseFrom(body -> {
+            if (body instanceof ListOffsetsRequest request) {
+                ListOffsetsTopic requestTopic = request.data().topics().get(0);
+                return requestTopic.name().equals(TOPIC) &&
+                    requestTopic.partitions().get(0).partitionIndex() == 
PARTITION;
+            }
+            return false;
+        }, new ListOffsetsResponse(
+            new org.apache.kafka.common.message.ListOffsetsResponseData()
+                .setTopics(List.of(
+                    new ListOffsetsTopicResponse()
+                        .setName(TOPIC)
+                        .setPartitions(List.of(
+                            new ListOffsetsPartitionResponse()
+                                .setPartitionIndex(PARTITION)
+                                .setErrorCode(Errors.NONE.code())
+                                .setOffset(expectedOffset)
+                                .setTimestamp(System.currentTimeMillis())
+                        ))
+                ))
+        ), LEADER_NODE);
+
+        client.prepareResponseFrom(body -> {
+            if (body instanceof ListOffsetsRequest request) {
+                ListOffsetsTopic requestTopic = request.data().topics().get(0);
+                return requestTopic.name().equals(TOPIC) &&
+                    requestTopic.partitions().get(0).partitionIndex() == 
PARTITION;
+            }
+            return false;
+        }, new ListOffsetsResponse(
+            new org.apache.kafka.common.message.ListOffsetsResponseData()
+                .setTopics(List.of(
+                    new ListOffsetsTopicResponse()
+                        .setName(TOPIC)
+                        .setPartitions(List.of(
+                            new ListOffsetsPartitionResponse()
+                                .setPartitionIndex(PARTITION)
+                                .setErrorCode(Errors.NONE.code())
+                                .setOffset(expectedOffset + 1)
+                                .setTimestamp(System.currentTimeMillis())
+                        ))
+                ))
+        ), LEADER_NODE);
+
+        networkPartitionMetadataClient = 
NetworkPartitionMetadataClientBuilder.builder()
+            .withMetadataCache(metadataCache)
+            .withKafkaClientSupplier(clientSupplier)
+            .build();
+
+        Set<TopicPartition> partitions = new HashSet<>();
+        partitions.add(tp);
+
+        // First call to listLatestOffsets
+        networkPartitionMetadataClient.listLatestOffsets(partitions);
+
+        // Verify supplier was called once
+        assertEquals(1, supplierCallCount[0]);
+
+        // Second call to listLatestOffsets
+        networkPartitionMetadataClient.listLatestOffsets(partitions);
+
+        // Verify supplier was still only called once (not again)
+        assertEquals(1, supplierCallCount[0]);
+    }
+}


Reply via email to