This is an automated email from the ASF dual-hosted git repository.
mittal pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 8461ae331a0 KAFKA-19800-2: Created an implementation class
NetworkPartitionMetadataClient for PartitionMetadataClient (#20852)
8461ae331a0 is described below
commit 8461ae331a012e0f2b6e5983991b46276d314452
Author: Chirag Wadhwa <[email protected]>
AuthorDate: Mon Nov 17 18:38:15 2025 +0530
KAFKA-19800-2: Created an implementation class
NetworkPartitionMetadataClient for PartitionMetadataClient (#20852)
This PR is part of
[KIP-1226](https://cwiki.apache.org/confluence/display/KAFKA/KIP-1226%3A+Introducing+Share+Partition+Lag+Persistence+and+Retrieval).
This PR introduces an implementation class
NetworkPartitionMetadataClient for PartitionMetadataClient, that uses a
NetworkClient to send ListOffsetsRequest to the destination node. The
destination node should be the leader broker for the partitions in the
request and is retrieved from MetadataCache.
This new imple class will later be used in GroupCoordinatorService to
find the partition end offsets while computing share partition lag for
DescribeShareGroupOffsets request.
Reviewers: Apoorv Mittal <[email protected]>, Andrew Schofield
<[email protected]>
---
checkstyle/import-control-group-coordinator.xml | 2 +-
.../src/main/scala/kafka/server/BrokerServer.scala | 45 +-
.../coordinator/group/GroupCoordinatorService.java | 36 +-
.../group/NetworkPartitionMetadataClient.java | 298 +++++++
.../coordinator/group/PartitionMetadataClient.java | 11 +-
.../group/GroupCoordinatorServiceTest.java | 231 +++--
.../group/NetworkPartitionMetadataClientTest.java | 972 +++++++++++++++++++++
7 files changed, 1467 insertions(+), 128 deletions(-)
diff --git a/checkstyle/import-control-group-coordinator.xml
b/checkstyle/import-control-group-coordinator.xml
index 44b5b020251..0e9aabad4a2 100644
--- a/checkstyle/import-control-group-coordinator.xml
+++ b/checkstyle/import-control-group-coordinator.xml
@@ -52,7 +52,7 @@
<subpackage name="coordinator">
<subpackage name="group">
<allow pkg="net.jpountz.xxhash" />
- <allow pkg="org.apache.kafka.clients.consumer" />
+ <allow pkg="org.apache.kafka.clients" />
<allow pkg="org.apache.kafka.common.annotation" />
<allow pkg="org.apache.kafka.common.config" />
<allow pkg="org.apache.kafka.common.compress" />
diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala
b/core/src/main/scala/kafka/server/BrokerServer.scala
index 5772a2879bc..3cba40491cd 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -36,12 +36,12 @@ import org.apache.kafka.common.utils.{LogContext, Time,
Utils}
import org.apache.kafka.common.{ClusterResource, TopicPartition, Uuid}
import org.apache.kafka.coordinator.common.runtime.{CoordinatorLoaderImpl,
CoordinatorRecord}
import org.apache.kafka.coordinator.group.metrics.{GroupCoordinatorMetrics,
GroupCoordinatorRuntimeMetrics}
-import org.apache.kafka.coordinator.group.{GroupConfigManager,
GroupCoordinator, GroupCoordinatorRecordSerde, GroupCoordinatorService,
PartitionMetadataClient}
+import org.apache.kafka.coordinator.group.{GroupConfigManager,
GroupCoordinator, GroupCoordinatorRecordSerde, GroupCoordinatorService,
NetworkPartitionMetadataClient, PartitionMetadataClient}
import org.apache.kafka.coordinator.share.metrics.{ShareCoordinatorMetrics,
ShareCoordinatorRuntimeMetrics}
import org.apache.kafka.coordinator.share.{ShareCoordinator,
ShareCoordinatorRecordSerde, ShareCoordinatorService}
import org.apache.kafka.coordinator.transaction.ProducerIdManager
import org.apache.kafka.image.publisher.{BrokerRegistrationTracker,
MetadataPublisher}
-import org.apache.kafka.metadata.{BrokerState, ListenerInfo,
MetadataVersionConfigValidator}
+import org.apache.kafka.metadata.{BrokerState, ListenerInfo, MetadataCache,
MetadataVersionConfigValidator}
import org.apache.kafka.metadata.publisher.{AclPublisher,
DelegationTokenPublisher, DynamicClientQuotaPublisher, ScramPublisher}
import org.apache.kafka.security.{CredentialProvider, DelegationTokenManager}
import org.apache.kafka.server.authorizer.Authorizer
@@ -96,6 +96,8 @@ class BrokerServer(
private var assignmentsManager: AssignmentsManager = _
+ private var partitionMetadataClient: PartitionMetadataClient = _
+
val lock: ReentrantLock = new ReentrantLock()
val awaitShutdownCond: Condition = lock.newCondition()
var status: ProcessStatus = SHUTDOWN
@@ -119,8 +121,6 @@ class BrokerServer(
var credentialProvider: CredentialProvider = _
var tokenCache: DelegationTokenCache = _
- var partitionMetadataClient: PartitionMetadataClient = _
-
@volatile var groupCoordinator: GroupCoordinator = _
var groupConfigManager: GroupConfigManager = _
@@ -373,7 +373,7 @@ class BrokerServer(
/* create persister */
persister = createShareStatePersister()
- partitionMetadataClient = createPartitionMetadataClient()
+ partitionMetadataClient = createPartitionMetadataClient(metadataCache)
groupCoordinator = createGroupCoordinator()
@@ -624,23 +624,19 @@ class BrokerServer(
}
}
- private def createPartitionMetadataClient(): PartitionMetadataClient = {
- // This is a no-op implementation of PartitionMetadataClient. It always
returns -1 as the latest offset for any
- // requested topic partition.
- // TODO: KAFKA-19800: Implement a real PartitionMetadataClient that can
fetch latest offsets via InterBrokerSendThread.
- new PartitionMetadataClient {
- override def listLatestOffsets(topicPartitions: util.Set[TopicPartition]
- ): util.Map[TopicPartition,
util.concurrent.CompletableFuture[java.lang.Long]] = {
- topicPartitions.asScala
- .map { tp =>
- tp ->
CompletableFuture.completedFuture(java.lang.Long.valueOf(-1L))
- }
- .toMap
- .asJava
- }
-
- override def close(): Unit = {}
- }
+ private def createPartitionMetadataClient(metadataCache: MetadataCache):
PartitionMetadataClient = {
+ new NetworkPartitionMetadataClient(
+ metadataCache,
+ () => NetworkUtils.buildNetworkClient(
+ "NetworkPartitionMetadataClient",
+ config,
+ metrics,
+ Time.SYSTEM,
+ new LogContext(s"[NetworkPartitionMetadataClient
broker=${config.brokerId}]")
+ ),
+ Time.SYSTEM,
+ config.interBrokerListenerName()
+ )
}
private def createGroupCoordinator(): GroupCoordinator = {
@@ -823,8 +819,13 @@ class BrokerServer(
if (groupConfigManager != null)
CoreUtils.swallow(groupConfigManager.close(), this)
+
if (groupCoordinator != null)
CoreUtils.swallow(groupCoordinator.shutdown(), this)
+
+ if (partitionMetadataClient != null)
+ CoreUtils.swallow(partitionMetadataClient.close(), this)
+
if (shareCoordinator != null)
CoreUtils.swallow(shareCoordinator.shutdown(), this)
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
index 3ffb2d3e644..afb9f744d04 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
@@ -1884,8 +1884,8 @@ public class GroupCoordinatorService implements
GroupCoordinator {
});
// Fetch latest offsets for all partitions that need lag computation.
- Map<TopicPartition, CompletableFuture<Long>> partitionLatestOffsets =
partitionsToComputeLag.isEmpty() ? Map.of() :
-
partitionMetadataClient.listLatestOffsets(partitionsToComputeLag);
+ Map<TopicPartition,
CompletableFuture<PartitionMetadataClient.OffsetResponse>>
partitionLatestOffsets =
+ partitionsToComputeLag.isEmpty() ? Map.of() :
partitionMetadataClient.listLatestOffsets(partitionsToComputeLag);
// Final response object to be built. It will include lag information
computed from partitionMetadataClient.
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup
responseGroup =
@@ -1898,8 +1898,13 @@ public class GroupCoordinatorService implements
GroupCoordinator {
CompletableFuture.allOf(partitionLatestOffsets.values().toArray(new
CompletableFuture<?>[0]))
.whenComplete((result, error) -> {
// The error variable will not be null when one or more of the
partitionLatestOffsets futures get completed exceptionally.
- // If that is the case, then the same exception would be
caught in the try catch executed below when .join() is called.
- // Thus, we do not need to check error != null here.
+ // As per the handling of these futures in
NetworkPartitionMetadataClient, this should not happen, as error cases are
+ // handled within the OffsetResponse object for each
partition. This is just a safety check.
+ if (error != null) {
+ log.error("Failed to retrieve partition end offsets while
calculating share partitions lag for share group - {}", groupId, error);
+ responseFuture.completeExceptionally(error);
+ return;
+ }
readSummaryResult.topicsData().forEach(topicData -> {
// Build response for each topic.
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic
topic =
@@ -1924,26 +1929,27 @@ public class GroupCoordinatorService implements
GroupCoordinator {
.setLeaderEpoch(partitionData.errorCode() ==
Errors.NONE.code() ? partitionData.leaderEpoch() :
PartitionFactory.DEFAULT_LEADER_EPOCH)
.setLag(PartitionFactory.UNINITIALIZED_LAG));
} else {
- try {
- // This code is reached when allOf above is
complete, which happens when all the
- // individual futures are complete. Thus, the
call to join() here is safe.
- long partitionLatestOffset =
partitionLatestOffsets.get(tp).join();
+ // This code is reached when allOf above is
complete, which happens when all the
+ // individual futures are complete. Thus, the call
to join() here is safe.
+ PartitionMetadataClient.OffsetResponse
offsetResponse = partitionLatestOffsets.get(tp).join();
+ if (offsetResponse.error().code() !=
Errors.NONE.code()) {
+ // If there was an error during fetching
latest offset for a partition, return the error in the response for that
partition.
+ log.error("Partition end offset fetch failed
for topicPartition {}", tp);
+ partitionResponses.add(new
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition()
+
.setPartitionIndex(partitionData.partition())
+
.setErrorCode(offsetResponse.error().code())
+
.setErrorMessage(offsetResponse.error().message()));
+ } else {
// Compute lag as (partition end offset -
startOffset - deliveryCompleteCount).
// Note, partition end offset, which is
retrieved from partitionMetadataClient, is the offset of
// the next message to be produced, not the
last message offset. Thus, the formula for lag computation
// does not need a +1 adjustment.
- long lag = partitionLatestOffset -
partitionData.startOffset() - partitionData.deliveryCompleteCount();
+ long lag = offsetResponse.offset() -
partitionData.startOffset() - partitionData.deliveryCompleteCount();
partitionResponses.add(new
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition()
.setPartitionIndex(partitionData.partition())
.setStartOffset(partitionData.startOffset())
.setLeaderEpoch(partitionData.leaderEpoch())
.setLag(lag));
- } catch (CompletionException e) {
- // If fetching latest offset for a partition
failed, return the error in the response for that partition.
- partitionResponses.add(new
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition()
-
.setPartitionIndex(partitionData.partition())
-
.setErrorCode(Errors.forException(e.getCause()).code())
-
.setErrorMessage(e.getCause().getMessage()));
}
}
});
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/NetworkPartitionMetadataClient.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/NetworkPartitionMetadataClient.java
new file mode 100644
index 00000000000..355b0d2fb21
--- /dev/null
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/NetworkPartitionMetadataClient.java
@@ -0,0 +1,298 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.KafkaClient;
+import org.apache.kafka.common.IsolationLevel;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicPartition;
+import
org.apache.kafka.common.message.ListOffsetsRequestData.ListOffsetsPartition;
+import org.apache.kafka.common.message.ListOffsetsRequestData.ListOffsetsTopic;
+import
org.apache.kafka.common.message.ListOffsetsResponseData.ListOffsetsPartitionResponse;
+import
org.apache.kafka.common.message.ListOffsetsResponseData.ListOffsetsTopicResponse;
+import org.apache.kafka.common.network.ListenerName;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ListOffsetsRequest;
+import org.apache.kafka.common.requests.ListOffsetsResponse;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.metadata.MetadataCache;
+import org.apache.kafka.server.util.InterBrokerSendThread;
+import org.apache.kafka.server.util.RequestAndCompletionHandler;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Supplier;
+
+public class NetworkPartitionMetadataClient implements PartitionMetadataClient
{
+
+ private static final Logger log =
LoggerFactory.getLogger(NetworkPartitionMetadataClient.class);
+
+ private final MetadataCache metadataCache;
+ private final Supplier<KafkaClient> networkClientSupplier;
+ private final Time time;
+ private final ListenerName listenerName;
+ private final AtomicBoolean initialized = new AtomicBoolean(false);
+ private volatile SendThread sendThread;
+
+ public NetworkPartitionMetadataClient(MetadataCache metadataCache,
+ Supplier<KafkaClient>
networkClientSupplier,
+ Time time, ListenerName
listenerName) {
+ this.metadataCache = metadataCache;
+ this.networkClientSupplier = networkClientSupplier;
+ this.time = time;
+ this.listenerName = listenerName;
+ }
+
+ @Override
+ public Map<TopicPartition, CompletableFuture<OffsetResponse>>
listLatestOffsets(Set<TopicPartition> topicPartitions) {
+ if (topicPartitions == null || topicPartitions.isEmpty()) {
+ return Map.of();
+ }
+
+ // Initialize sendThread lazily on first call
+ ensureSendThreadInitialized();
+
+ // Map to store futures for each TopicPartition
+ Map<TopicPartition, CompletableFuture<OffsetResponse>> futures = new
HashMap<>();
+ // Group TopicPartitions by leader node
+ Map<Node, List<TopicPartition>> partitionsByNode = new HashMap<>();
+ for (TopicPartition tp : topicPartitions) {
+ // Get leader node for this partition
+ Optional<Node> leaderNodeOpt =
metadataCache.getPartitionLeaderEndpoint(
+ tp.topic(),
+ tp.partition(),
+ listenerName
+ );
+
+ if (leaderNodeOpt.isEmpty() || leaderNodeOpt.get().isEmpty()) {
+ // No leader available - complete with error
+ futures.put(tp, CompletableFuture.completedFuture(new
OffsetResponse(-1, Errors.LEADER_NOT_AVAILABLE)));
+ continue;
+ }
+
+ partitionsByNode.computeIfAbsent(leaderNodeOpt.get(), k -> new
ArrayList<>()).add(tp);
+ }
+
+ // Create and enqueue requests for each node
+ partitionsByNode.forEach((node, partitionsByLeader) -> {
+ // All partitions with the same leader node will be included in
the same ListOffsetsRequest.
+ Map<TopicPartition, CompletableFuture<OffsetResponse>>
partitionFuturesByLeader = new HashMap<>();
+ for (TopicPartition tp : partitionsByLeader) {
+ CompletableFuture<OffsetResponse> future = new
CompletableFuture<>();
+ futures.put(tp, future);
+ partitionFuturesByLeader.put(tp, future);
+ }
+
+ // Create ListOffsetsRequest for this node
+ ListOffsetsRequest.Builder requestBuilder =
createListOffsetsRequest(partitionsByLeader);
+ // Create pending request to track this request
+ PendingRequest pendingRequest = new PendingRequest(node,
partitionFuturesByLeader, requestBuilder);
+ // Enqueue to send thread
+ sendThread.enqueue(pendingRequest);
+ });
+
+ return futures;
+ }
+
+ @Override
+ public void close() {
+ // Only close sendThread if it was initialized. Note, close is called
only during broker shutdown, so need
+ // for further synchronization here.
+ if (!initialized.get()) {
+ return;
+ }
+ if (sendThread != null) {
+ try {
+ sendThread.shutdown();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ log.error("Interrupted while shutting down
NetworkPartitionMetadataClient", e);
+ }
+ }
+ }
+
+ /**
+ * Ensures that the sendThread is initialized. This method is thread-safe
and will only
+ * initialize the sendThread once, even if called concurrently.
+ */
+ // Visible for testing.
+ void ensureSendThreadInitialized() {
+ if (initialized.compareAndSet(false, true)) {
+ KafkaClient networkClient = networkClientSupplier.get();
+ sendThread = new SendThread(
+ "NetworkPartitionMetadataClientSendThread",
+ networkClient,
+
Math.toIntExact(CommonClientConfigs.DEFAULT_SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS),
//30 seconds
+ this.time
+ );
+ sendThread.start();
+ log.info("NetworkPartitionMetadataClient sendThread initialized
and started");
+ }
+ }
+
+ /**
+ * Creates a ListOffsetsRequest Builder for the given partitions
requesting latest offsets.
+ */
+ private ListOffsetsRequest.Builder
createListOffsetsRequest(List<TopicPartition> partitions) {
+ Map<String, ListOffsetsTopic> topicsMap = new HashMap<>();
+ partitions.forEach(tp -> {
+ if (!topicsMap.containsKey(tp.topic())) {
+ ListOffsetsTopic topic = new
ListOffsetsTopic().setName(tp.topic());
+ topicsMap.put(tp.topic(), topic);
+ }
+ ListOffsetsTopic topic = topicsMap.get(tp.topic());
+ topic.partitions().add(
+ new ListOffsetsPartition()
+ .setPartitionIndex(tp.partition())
+ .setTimestamp(ListOffsetsRequest.LATEST_TIMESTAMP)
+ .setCurrentLeaderEpoch(-1) // Will be set by broker if
needed
+ );
+ });
+ // Isolation level will always be READ_UNCOMMITTED when finding the
partition end offset.
+ return ListOffsetsRequest.Builder.forConsumer(
+ true,
+ IsolationLevel.READ_UNCOMMITTED
+ ).setTargetTimes(List.copyOf(topicsMap.values()));
+ }
+
+ /**
+ * Handles the response from a ListOffsets request.
+ */
+ // Visible for Testing.
+ void handleResponse(Map<TopicPartition, CompletableFuture<OffsetResponse>>
partitionFutures, ClientResponse clientResponse) {
+ // Handle error responses first
+ if (maybeHandleErrorResponse(partitionFutures, clientResponse)) {
+ return;
+ }
+
+ log.debug("ListOffsets response received successfully - {}",
clientResponse);
+ ListOffsetsResponse response = (ListOffsetsResponse)
clientResponse.responseBody();
+
+ for (ListOffsetsTopicResponse topicResponse : response.topics()) {
+ String topicName = topicResponse.name();
+ for (ListOffsetsPartitionResponse partitionResponse :
topicResponse.partitions()) {
+ TopicPartition tp = new TopicPartition(topicName,
partitionResponse.partitionIndex());
+ // Get the corresponding future from the map and complete it.
+ CompletableFuture<OffsetResponse> future =
partitionFutures.get(tp);
+ if (future != null) {
+ future.complete(new
OffsetResponse(partitionResponse.offset(),
Errors.forCode(partitionResponse.errorCode())));
+ }
+ }
+ }
+
+ partitionFutures.forEach((tp, future) -> {
+ // If future is not completed yet hence topic-partition was not
included in the response, complete with error
+ if (!future.isDone()) {
+ future.complete(new OffsetResponse(-1,
Errors.UNKNOWN_TOPIC_OR_PARTITION));
+ }
+ });
+ }
+
+ /**
+ * Handles error responses by completing all associated futures with an
error. Returns true if an error was
+ * handled. Otherwise, returns false.
+ */
+ private boolean maybeHandleErrorResponse(Map<TopicPartition,
CompletableFuture<OffsetResponse>> partitionFutures, ClientResponse
clientResponse) {
+ Errors error;
+ if (clientResponse == null) {
+ log.error("Response for ListOffsets for topicPartitions: {} is
null", partitionFutures.keySet());
+ error = Errors.UNKNOWN_SERVER_ERROR;
+ } else if (clientResponse.authenticationException() != null) {
+ log.error("Authentication exception",
clientResponse.authenticationException());
+ error = Errors.UNKNOWN_SERVER_ERROR;
+ } else if (clientResponse.versionMismatch() != null) {
+ log.error("Version mismatch exception",
clientResponse.versionMismatch());
+ error = Errors.UNKNOWN_SERVER_ERROR;
+ } else if (clientResponse.wasDisconnected()) {
+ log.error("Response for ListOffsets for TopicPartitions: {} was
disconnected - {}.", partitionFutures.keySet(), clientResponse);
+ error = Errors.NETWORK_EXCEPTION;
+ } else if (clientResponse.wasTimedOut()) {
+ log.error("Response for ListOffsets for TopicPartitions: {} timed
out - {}.", partitionFutures.keySet(), clientResponse);
+ error = Errors.REQUEST_TIMED_OUT;
+ } else if (!clientResponse.hasResponse()) {
+ log.error("Response for ListOffsets for TopicPartitions: {} has no
response - {}.", partitionFutures.keySet(), clientResponse);
+ error = Errors.UNKNOWN_SERVER_ERROR;
+ } else {
+ // No error to handle, returning false instantly.
+ return false;
+ }
+
+ partitionFutures.forEach((tp, future) -> future.complete(new
OffsetResponse(-1, error)));
+ return true;
+ }
+
+ /**
+ * Tracks a pending ListOffsets request and its associated futures.
+ */
+ private record PendingRequest(Node node,
+ Map<TopicPartition,
CompletableFuture<OffsetResponse>> futures,
+ ListOffsetsRequest.Builder requestBuilder) {
+ }
+
+ private class SendThread extends InterBrokerSendThread {
+ private final ConcurrentLinkedQueue<PendingRequest> pendingRequests =
new ConcurrentLinkedQueue<>();
+
+ protected SendThread(String name, KafkaClient networkClient, int
requestTimeoutMs, Time time) {
+ super(name, networkClient, requestTimeoutMs, time);
+ }
+
+ /**
+ * Enqueues a pending request to be sent.
+ */
+ public void enqueue(PendingRequest pendingRequest) {
+ pendingRequests.add(pendingRequest);
+ wakeup();
+ }
+
+ @Override
+ public Collection<RequestAndCompletionHandler> generateRequests() {
+ List<RequestAndCompletionHandler> requests = new ArrayList<>();
+
+ // Process all pending requests
+ PendingRequest pending;
+ while ((pending = pendingRequests.poll()) != null) {
+ final PendingRequest current = pending;
+ ListOffsetsRequest.Builder requestBuilder =
current.requestBuilder;
+
+ // Create completion handler
+ RequestAndCompletionHandler requestHandler = new
RequestAndCompletionHandler(
+ time.hiResClockMs(),
+ current.node,
+ requestBuilder,
+ response -> handleResponse(current.futures, response)
+ );
+
+ requests.add(requestHandler);
+ }
+
+ return requests;
+ }
+ }
+}
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/PartitionMetadataClient.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/PartitionMetadataClient.java
index ea6d940143f..cf67e9ceb48 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/PartitionMetadataClient.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/PartitionMetadataClient.java
@@ -17,6 +17,7 @@
package org.apache.kafka.coordinator.group;
import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.protocol.Errors;
import java.util.Map;
import java.util.Set;
@@ -32,7 +33,15 @@ public interface PartitionMetadataClient extends
AutoCloseable {
* @param topicPartitions A set of topic partitions.
* @return A map of topic partitions to the completableFuture of their
latest offsets
*/
- Map<TopicPartition, CompletableFuture<Long>> listLatestOffsets(
+ Map<TopicPartition, CompletableFuture<OffsetResponse>> listLatestOffsets(
Set<TopicPartition> topicPartitions
);
+
+ /**
+ * A record to hold the offset and any associated error.
+ */
+ record OffsetResponse(
+ long offset,
+ Errors error
+ ) { }
}
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
index 1a4efc6d989..10faf16b3a8 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
@@ -36,6 +36,7 @@ import
org.apache.kafka.common.errors.StreamsInvalidTopologyEpochException;
import org.apache.kafka.common.errors.StreamsInvalidTopologyException;
import org.apache.kafka.common.errors.StreamsTopologyFencedException;
import org.apache.kafka.common.errors.UnknownMemberIdException;
+import org.apache.kafka.common.errors.UnknownServerException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.internals.Topic;
import org.apache.kafka.common.message.AlterShareGroupOffsetsRequestData;
@@ -151,6 +152,7 @@ import static
org.apache.kafka.test.TestUtils.assertFutureThrows;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -3742,7 +3744,8 @@ public class GroupCoordinatorServiceTest {
Set<TopicPartition> partitionsToComputeLag = new HashSet<>(Set.of(new
TopicPartition(TOPIC_NAME, 1)));
when(partitionMetadataClient.listLatestOffsets(partitionsToComputeLag))
- .thenReturn(Map.of(new TopicPartition(TOPIC_NAME, 1),
CompletableFuture.completedFuture(41L)));
+ .thenReturn(Map.of(new TopicPartition(TOPIC_NAME, 1),
+ CompletableFuture.completedFuture(new
PartitionMetadataClient.OffsetResponse(41L, Errors.NONE))));
int partition = 1;
DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestGroup
requestData = new
DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestGroup()
@@ -3980,7 +3983,7 @@ public class GroupCoordinatorServiceTest {
}
@Test
- public void
testDescribeShareGroupOffsetsWithDefaultPersisterLatestOffsetError() throws
InterruptedException, ExecutionException {
+ public void
testDescribeShareGroupOffsetsWithDefaultPersisterLatestOffsetThrowsError()
throws InterruptedException, ExecutionException {
CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime =
mockRuntime();
Persister persister = mock(DefaultStatePersister.class);
@@ -3994,7 +3997,7 @@ public class GroupCoordinatorServiceTest {
.build(true);
service.startup(() -> 1);
- Exception ex = new Exception("failure");
+ Exception ex = new UnknownServerException("failure");
Set<TopicPartition> partitionsToComputeLag = new HashSet<>(Set.of(new
TopicPartition(TOPIC_NAME, 1)));
when(partitionMetadataClient.listLatestOffsets(partitionsToComputeLag))
@@ -4015,6 +4018,67 @@ public class GroupCoordinatorServiceTest {
.setPartitions(List.of(new
ReadShareGroupStateSummaryRequestData.PartitionData()
.setPartition(partition)))));
+ ReadShareGroupStateSummaryResponseData
readShareGroupStateSummaryResponseData = new
ReadShareGroupStateSummaryResponseData()
+ .setResults(
+ List.of(new
ReadShareGroupStateSummaryResponseData.ReadStateSummaryResult()
+ .setTopicId(TOPIC_ID)
+ .setPartitions(List.of(new
ReadShareGroupStateSummaryResponseData.PartitionResult()
+ .setPartition(partition)
+ .setStartOffset(21)
+ .setDeliveryCompleteCount(10)
+ .setStateEpoch(1)))
+ )
+ );
+
+ ReadShareGroupStateSummaryParameters
readShareGroupStateSummaryParameters =
ReadShareGroupStateSummaryParameters.from(readShareGroupStateSummaryRequestData);
+ ReadShareGroupStateSummaryResult readShareGroupStateSummaryResult =
ReadShareGroupStateSummaryResult.from(readShareGroupStateSummaryResponseData);
+ when(persister.readSummary(
+ ArgumentMatchers.eq(readShareGroupStateSummaryParameters)
+
)).thenReturn(CompletableFuture.completedFuture(readShareGroupStateSummaryResult));
+
+
CompletableFuture<DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup>
future =
+
service.describeShareGroupOffsets(requestContext(ApiKeys.DESCRIBE_SHARE_GROUP_OFFSETS),
requestData);
+
+ CompletionException responseException =
assertThrows(CompletionException.class, future::join);
+ assertInstanceOf(UnknownServerException.class,
responseException.getCause());
+ assertEquals("failure", responseException.getCause().getMessage());
+ }
+
+ @Test
+ public void
testDescribeShareGroupOffsetsWithDefaultPersisterLatestOffsetReturnsError()
throws InterruptedException, ExecutionException {
+ CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime =
mockRuntime();
+ Persister persister = mock(DefaultStatePersister.class);
+
+ PartitionMetadataClient partitionMetadataClient =
mock(PartitionMetadataClient.class);
+
+ GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
+ .setConfig(createConfig())
+ .setRuntime(runtime)
+ .setPersister(persister)
+ .setPartitionMetadataClient(partitionMetadataClient)
+ .build(true);
+ service.startup(() -> 1);
+
+ Set<TopicPartition> partitionsToComputeLag = new HashSet<>(Set.of(new
TopicPartition(TOPIC_NAME, 1)));
+ when(partitionMetadataClient.listLatestOffsets(partitionsToComputeLag))
+ .thenReturn(Map.of(new TopicPartition(TOPIC_NAME, 1),
+ CompletableFuture.completedFuture(new
PartitionMetadataClient.OffsetResponse(-1, Errors.NETWORK_EXCEPTION))));
+
+ int partition = 1;
+
DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestGroup
requestData = new
DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestGroup()
+ .setGroupId("share-group-id")
+ .setTopics(List.of(new
DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestTopic()
+ .setTopicName(TOPIC_NAME)
+ .setPartitions(List.of(partition))
+ ));
+
+ ReadShareGroupStateSummaryRequestData
readShareGroupStateSummaryRequestData = new
ReadShareGroupStateSummaryRequestData()
+ .setGroupId("share-group-id")
+ .setTopics(List.of(new
ReadShareGroupStateSummaryRequestData.ReadStateSummaryData()
+ .setTopicId(TOPIC_ID)
+ .setPartitions(List.of(new
ReadShareGroupStateSummaryRequestData.PartitionData()
+ .setPartition(partition)))));
+
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup
responseData = new
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup()
.setGroupId("share-group-id")
.setTopics(
@@ -4023,8 +4087,8 @@ public class GroupCoordinatorServiceTest {
.setTopicId(TOPIC_ID)
.setPartitions(List.of(new
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition()
.setPartitionIndex(partition)
- .setErrorCode(Errors.forException(ex).code())
- .setErrorMessage(ex.getMessage())
+ .setErrorCode(Errors.NETWORK_EXCEPTION.code())
+ .setErrorMessage(Errors.NETWORK_EXCEPTION.message())
))
)
);
@@ -4053,6 +4117,77 @@ public class GroupCoordinatorServiceTest {
assertEquals(responseData, future.get());
}
+ @Test
+ public void testDescribeShareGroupAllOffsetsLatestOffsetError() throws
InterruptedException, ExecutionException {
+ CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime =
mockRuntime();
+ Persister persister = mock(DefaultStatePersister.class);
+
+ PartitionMetadataClient partitionMetadataClient =
mock(PartitionMetadataClient.class);
+
+ GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
+ .setConfig(createConfig())
+ .setRuntime(runtime)
+ .setPersister(persister)
+ .setPartitionMetadataClient(partitionMetadataClient)
+ .build(true);
+
+ Exception ex = new UnknownServerException("failure");
+
+ Set<TopicPartition> partitionsToComputeLag = new HashSet<>(Set.of(new
TopicPartition(TOPIC_NAME, 1)));
+ when(partitionMetadataClient.listLatestOffsets(partitionsToComputeLag))
+ .thenReturn(Map.of(new TopicPartition(TOPIC_NAME, 1),
CompletableFuture.failedFuture(ex)));
+
+ MetadataImage image = new MetadataImageBuilder()
+ .addTopic(TOPIC_ID, TOPIC_NAME, 3)
+ .build();
+
+ service.onNewMetadataImage(new KRaftCoordinatorMetadataImage(image),
null);
+
+ int partition = 1;
+
+ when(runtime.scheduleReadOperation(
+ ArgumentMatchers.eq("share-group-initialized-partitions"),
+ ArgumentMatchers.any(),
+ ArgumentMatchers.any()
+ )).thenReturn(CompletableFuture.completedFuture(Map.of(TOPIC_ID,
Set.of(partition))));
+
+
DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestGroup
requestData = new
DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestGroup()
+ .setGroupId("share-group-id")
+ .setTopics(null);
+
+ ReadShareGroupStateSummaryRequestData
readShareGroupStateSummaryRequestData = new
ReadShareGroupStateSummaryRequestData()
+ .setGroupId("share-group-id")
+ .setTopics(List.of(new
ReadShareGroupStateSummaryRequestData.ReadStateSummaryData()
+ .setTopicId(TOPIC_ID)
+ .setPartitions(List.of(new
ReadShareGroupStateSummaryRequestData.PartitionData()
+ .setPartition(partition)))));
+
+ ReadShareGroupStateSummaryResponseData
readShareGroupStateSummaryResponseData = new
ReadShareGroupStateSummaryResponseData()
+ .setResults(
+ List.of(new
ReadShareGroupStateSummaryResponseData.ReadStateSummaryResult()
+ .setTopicId(TOPIC_ID)
+ .setPartitions(List.of(new
ReadShareGroupStateSummaryResponseData.PartitionResult()
+ .setPartition(partition)
+ .setStartOffset(21)
+ .setDeliveryCompleteCount(10)
+ .setStateEpoch(1)))
+ )
+ );
+
+ ReadShareGroupStateSummaryParameters
readShareGroupStateSummaryParameters =
ReadShareGroupStateSummaryParameters.from(readShareGroupStateSummaryRequestData);
+ ReadShareGroupStateSummaryResult readShareGroupStateSummaryResult =
ReadShareGroupStateSummaryResult.from(readShareGroupStateSummaryResponseData);
+ when(persister.readSummary(
+ ArgumentMatchers.eq(readShareGroupStateSummaryParameters)
+
)).thenReturn(CompletableFuture.completedFuture(readShareGroupStateSummaryResult));
+
+
CompletableFuture<DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup>
future =
+
service.describeShareGroupAllOffsets(requestContext(ApiKeys.DESCRIBE_SHARE_GROUP_OFFSETS),
requestData);
+
+ CompletionException responseException =
assertThrows(CompletionException.class, future::join);
+ assertInstanceOf(UnknownServerException.class,
responseException.getCause());
+ assertEquals("failure", responseException.getCause().getMessage());
+ }
+
@Test
public void testDescribeShareGroupOffsetsCoordinatorNotActive() throws
ExecutionException, InterruptedException {
CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime =
mockRuntime();
@@ -4126,7 +4261,8 @@ public class GroupCoordinatorServiceTest {
Set<TopicPartition> partitionsToComputeLag = new HashSet<>(Set.of(new
TopicPartition(TOPIC_NAME, 1)));
when(partitionMetadataClient.listLatestOffsets(partitionsToComputeLag))
- .thenReturn(Map.of(new TopicPartition(TOPIC_NAME, 1),
CompletableFuture.completedFuture(41L)));
+ .thenReturn(Map.of(new TopicPartition(TOPIC_NAME, 1),
+ CompletableFuture.completedFuture(new
PartitionMetadataClient.OffsetResponse(41L, Errors.NONE))));
MetadataImage image = new MetadataImageBuilder()
.addTopic(TOPIC_ID, TOPIC_NAME, 3)
@@ -4334,89 +4470,6 @@ public class GroupCoordinatorServiceTest {
assertEquals(responseData, future.get());
}
- @Test
- public void testDescribeShareGroupAllOffsetsLatestOffsetError() throws
InterruptedException, ExecutionException {
- CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime =
mockRuntime();
- Persister persister = mock(DefaultStatePersister.class);
-
- PartitionMetadataClient partitionMetadataClient =
mock(PartitionMetadataClient.class);
-
- GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
- .setConfig(createConfig())
- .setRuntime(runtime)
- .setPersister(persister)
- .setPartitionMetadataClient(partitionMetadataClient)
- .build(true);
-
- Exception ex = new Exception("failure");
-
- Set<TopicPartition> partitionsToComputeLag = new HashSet<>(Set.of(new
TopicPartition(TOPIC_NAME, 1)));
- when(partitionMetadataClient.listLatestOffsets(partitionsToComputeLag))
- .thenReturn(Map.of(new TopicPartition(TOPIC_NAME, 1),
CompletableFuture.failedFuture(ex)));
-
- MetadataImage image = new MetadataImageBuilder()
- .addTopic(TOPIC_ID, TOPIC_NAME, 3)
- .build();
-
- service.onNewMetadataImage(new KRaftCoordinatorMetadataImage(image),
null);
-
- int partition = 1;
-
- when(runtime.scheduleReadOperation(
- ArgumentMatchers.eq("share-group-initialized-partitions"),
- ArgumentMatchers.any(),
- ArgumentMatchers.any()
- )).thenReturn(CompletableFuture.completedFuture(Map.of(TOPIC_ID,
Set.of(partition))));
-
-
DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestGroup
requestData = new
DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestGroup()
- .setGroupId("share-group-id")
- .setTopics(null);
-
- ReadShareGroupStateSummaryRequestData
readShareGroupStateSummaryRequestData = new
ReadShareGroupStateSummaryRequestData()
- .setGroupId("share-group-id")
- .setTopics(List.of(new
ReadShareGroupStateSummaryRequestData.ReadStateSummaryData()
- .setTopicId(TOPIC_ID)
- .setPartitions(List.of(new
ReadShareGroupStateSummaryRequestData.PartitionData()
- .setPartition(partition)))));
-
-
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup
responseData = new
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup()
- .setGroupId("share-group-id")
- .setTopics(
- List.of(new
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic()
- .setTopicName(TOPIC_NAME)
- .setTopicId(TOPIC_ID)
- .setPartitions(List.of(new
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition()
- .setPartitionIndex(partition)
- .setErrorCode(Errors.forException(ex).code())
- .setErrorMessage(ex.getMessage())
- ))
- )
- );
-
- ReadShareGroupStateSummaryResponseData
readShareGroupStateSummaryResponseData = new
ReadShareGroupStateSummaryResponseData()
- .setResults(
- List.of(new
ReadShareGroupStateSummaryResponseData.ReadStateSummaryResult()
- .setTopicId(TOPIC_ID)
- .setPartitions(List.of(new
ReadShareGroupStateSummaryResponseData.PartitionResult()
- .setPartition(partition)
- .setStartOffset(21)
- .setDeliveryCompleteCount(10)
- .setStateEpoch(1)))
- )
- );
-
- ReadShareGroupStateSummaryParameters
readShareGroupStateSummaryParameters =
ReadShareGroupStateSummaryParameters.from(readShareGroupStateSummaryRequestData);
- ReadShareGroupStateSummaryResult readShareGroupStateSummaryResult =
ReadShareGroupStateSummaryResult.from(readShareGroupStateSummaryResponseData);
- when(persister.readSummary(
- ArgumentMatchers.eq(readShareGroupStateSummaryParameters)
-
)).thenReturn(CompletableFuture.completedFuture(readShareGroupStateSummaryResult));
-
-
CompletableFuture<DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup>
future =
-
service.describeShareGroupAllOffsets(requestContext(ApiKeys.DESCRIBE_SHARE_GROUP_OFFSETS),
requestData);
-
- assertEquals(responseData, future.get());
- }
-
@Test
public void testDescribeShareGroupAllOffsetsCoordinatorNotActive() throws
ExecutionException, InterruptedException {
CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime =
mockRuntime();
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/NetworkPartitionMetadataClientTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/NetworkPartitionMetadataClientTest.java
new file mode 100644
index 00000000000..49e4bf8c783
--- /dev/null
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/NetworkPartitionMetadataClientTest.java
@@ -0,0 +1,972 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.KafkaClient;
+import org.apache.kafka.clients.MockClient;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.AuthenticationException;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
+import org.apache.kafka.common.message.ListOffsetsRequestData.ListOffsetsTopic;
+import
org.apache.kafka.common.message.ListOffsetsResponseData.ListOffsetsPartitionResponse;
+import
org.apache.kafka.common.message.ListOffsetsResponseData.ListOffsetsTopicResponse;
+import org.apache.kafka.common.network.ListenerName;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ListOffsetsRequest;
+import org.apache.kafka.common.requests.ListOffsetsResponse;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
+import org.apache.kafka.metadata.MetadataCache;
+import org.apache.kafka.server.util.MockTime;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.function.Supplier;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+class NetworkPartitionMetadataClientTest {
+ private static final MockTime MOCK_TIME = new MockTime();
+ private static final MetadataCache METADATA_CACHE =
mock(MetadataCache.class);
+ private static final Supplier<KafkaClient> KAFKA_CLIENT_SUPPLIER = () ->
mock(KafkaClient.class);
+ private static final String HOST = "localhost";
+ private static final int PORT = 9092;
+ private static final ListenerName LISTENER_NAME =
ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT);
+ private static final String TOPIC = "test-topic";
+ private static final int PARTITION = 0;
+ private static final Node LEADER_NODE = new Node(1, HOST, PORT);
+
+ private NetworkPartitionMetadataClient networkPartitionMetadataClient;
+
+ private static class NetworkPartitionMetadataClientBuilder {
+ private MetadataCache metadataCache = METADATA_CACHE;
+ private Supplier<KafkaClient> kafkaClientSupplier =
KAFKA_CLIENT_SUPPLIER;
+
+ NetworkPartitionMetadataClientBuilder withMetadataCache(MetadataCache
metadataCache) {
+ this.metadataCache = metadataCache;
+ return this;
+ }
+
+ NetworkPartitionMetadataClientBuilder
withKafkaClientSupplier(Supplier<KafkaClient> kafkaClientSupplier) {
+ this.kafkaClientSupplier = kafkaClientSupplier;
+ return this;
+ }
+
+ static NetworkPartitionMetadataClientBuilder builder() {
+ return new NetworkPartitionMetadataClientBuilder();
+ }
+
+ NetworkPartitionMetadataClient build() {
+ return new NetworkPartitionMetadataClient(metadataCache,
kafkaClientSupplier, MOCK_TIME, LISTENER_NAME);
+ }
+ }
+
+ @BeforeEach
+ public void setUp() {
+ networkPartitionMetadataClient = null;
+ }
+
+ @AfterEach
+ public void tearDown() {
+ if (networkPartitionMetadataClient != null) {
+ try {
+ networkPartitionMetadataClient.close();
+ } catch (Exception e) {
+ fail("Failed to close NetworkPartitionMetadataClient", e);
+ }
+ }
+ }
+
+ @Test
+ public void testListLatestOffsetsSuccess() throws ExecutionException,
InterruptedException {
+ TopicPartition tp = new TopicPartition(TOPIC, PARTITION);
+ long expectedOffset = 100L;
+ MetadataCache metadataCache = mock(MetadataCache.class);
+ MockClient client = new MockClient(MOCK_TIME);
+
+ // Mock metadata cache to return leader node
+ when(metadataCache.getPartitionLeaderEndpoint(TOPIC, PARTITION,
LISTENER_NAME))
+ .thenReturn(Optional.of(LEADER_NODE));
+
+ // Prepare response for ListOffsets request
+ client.prepareResponseFrom(body -> {
+ if (body instanceof ListOffsetsRequest request) {
+ ListOffsetsTopic requestTopic = request.data().topics().get(0);
+ return requestTopic.name().equals(TOPIC) &&
+ requestTopic.partitions().get(0).partitionIndex() ==
PARTITION;
+ }
+ return false;
+ }, new ListOffsetsResponse(
+ new org.apache.kafka.common.message.ListOffsetsResponseData()
+ .setTopics(List.of(
+ new ListOffsetsTopicResponse()
+ .setName(TOPIC)
+ .setPartitions(List.of(
+ new ListOffsetsPartitionResponse()
+ .setPartitionIndex(PARTITION)
+ .setErrorCode(Errors.NONE.code())
+ .setOffset(expectedOffset)
+ .setTimestamp(System.currentTimeMillis())
+ ))
+ ))
+ ), LEADER_NODE);
+
+ networkPartitionMetadataClient =
NetworkPartitionMetadataClientBuilder.builder()
+ .withMetadataCache(metadataCache)
+ .withKafkaClientSupplier(() -> client)
+ .build();
+
+ Set<TopicPartition> partitions = new HashSet<>();
+ partitions.add(tp);
+
+ Map<TopicPartition,
CompletableFuture<PartitionMetadataClient.OffsetResponse>> futures =
+ networkPartitionMetadataClient.listLatestOffsets(partitions);
+
+ assertNotNull(futures);
+ assertEquals(1, futures.size());
+ assertTrue(futures.containsKey(tp));
+
+ PartitionMetadataClient.OffsetResponse response =
futures.get(tp).get();
+ assertTrue(futures.get(tp).isDone() &&
!futures.get(tp).isCompletedExceptionally());
+ assertNotNull(response);
+ assertEquals(Errors.NONE.code(), response.error().code());
+ assertEquals(expectedOffset, response.offset());
+ }
+
+ @Test
+ public void testListLatestOffsetsEmptyPartitionLeader() throws
ExecutionException, InterruptedException {
+ TopicPartition tp = new TopicPartition(TOPIC, PARTITION);
+ MetadataCache metadataCache = mock(MetadataCache.class);
+
+ // Mock metadata cache to return leader node
+ when(metadataCache.getPartitionLeaderEndpoint(TOPIC, PARTITION,
LISTENER_NAME))
+ .thenReturn(Optional.empty());
+
+ networkPartitionMetadataClient =
NetworkPartitionMetadataClientBuilder.builder()
+ .withMetadataCache(metadataCache)
+ .build();
+
+ Set<TopicPartition> partitions = new HashSet<>();
+ partitions.add(tp);
+
+ Map<TopicPartition,
CompletableFuture<PartitionMetadataClient.OffsetResponse>> futures =
+ networkPartitionMetadataClient.listLatestOffsets(partitions);
+
+ assertNotNull(futures);
+ assertEquals(1, futures.size());
+ assertTrue(futures.containsKey(tp));
+
+ PartitionMetadataClient.OffsetResponse response =
futures.get(tp).get();
+ assertTrue(futures.get(tp).isDone() &&
!futures.get(tp).isCompletedExceptionally());
+ assertNotNull(response);
+ assertEquals(Errors.LEADER_NOT_AVAILABLE.code(),
response.error().code());
+ }
+
+ @Test
+ public void testListLatestOffsetsNoNodePartitionLeader() throws
ExecutionException, InterruptedException {
+ TopicPartition tp = new TopicPartition(TOPIC, PARTITION);
+ MetadataCache metadataCache = mock(MetadataCache.class);
+
+ // Mock metadata cache to return leader node
+ when(metadataCache.getPartitionLeaderEndpoint(TOPIC, PARTITION,
LISTENER_NAME))
+ .thenReturn(Optional.of(Node.noNode()));
+
+ networkPartitionMetadataClient =
NetworkPartitionMetadataClientBuilder.builder()
+ .withMetadataCache(metadataCache)
+ .build();
+
+ Set<TopicPartition> partitions = new HashSet<>();
+ partitions.add(tp);
+
+ Map<TopicPartition,
CompletableFuture<PartitionMetadataClient.OffsetResponse>> futures =
+ networkPartitionMetadataClient.listLatestOffsets(partitions);
+
+ assertNotNull(futures);
+ assertEquals(1, futures.size());
+ assertTrue(futures.containsKey(tp));
+
+ PartitionMetadataClient.OffsetResponse response =
futures.get(tp).get();
+ assertTrue(futures.get(tp).isDone() &&
!futures.get(tp).isCompletedExceptionally());
+ assertNotNull(response);
+ assertEquals(Errors.LEADER_NOT_AVAILABLE.code(),
response.error().code());
+ }
+
+ @Test
+ public void testListLatestOffsetsNullResponseBody() throws
ExecutionException, InterruptedException {
+ TopicPartition tp = new TopicPartition(TOPIC, PARTITION);
+ MetadataCache metadataCache = mock(MetadataCache.class);
+ MockClient client = new MockClient(MOCK_TIME);
+
+ // Mock metadata cache to return leader node
+ when(metadataCache.getPartitionLeaderEndpoint(TOPIC, PARTITION,
LISTENER_NAME))
+ .thenReturn(Optional.of(LEADER_NODE));
+
+ // Prepare null response
+ client.prepareResponseFrom(body -> {
+ if (body instanceof ListOffsetsRequest) {
+ ListOffsetsRequest request = (ListOffsetsRequest) body;
+ ListOffsetsTopic requestTopic = request.data().topics().get(0);
+ return requestTopic.name().equals(TOPIC) &&
+ requestTopic.partitions().get(0).partitionIndex() ==
PARTITION;
+ }
+ return false;
+ }, null, LEADER_NODE);
+
+ networkPartitionMetadataClient =
NetworkPartitionMetadataClientBuilder.builder()
+ .withMetadataCache(metadataCache)
+ .withKafkaClientSupplier(() -> client)
+ .build();
+
+ Set<TopicPartition> partitions = new HashSet<>();
+ partitions.add(tp);
+
+ Map<TopicPartition,
CompletableFuture<PartitionMetadataClient.OffsetResponse>> futures =
+ networkPartitionMetadataClient.listLatestOffsets(partitions);
+
+ assertNotNull(futures);
+ assertEquals(1, futures.size());
+ assertTrue(futures.containsKey(tp));
+
+ PartitionMetadataClient.OffsetResponse response =
futures.get(tp).get();
+ assertTrue(futures.get(tp).isDone() &&
!futures.get(tp).isCompletedExceptionally());
+ assertNotNull(response);
+ assertEquals(Errors.UNKNOWN_SERVER_ERROR.code(),
response.error().code());
+ }
+
+ @Test
+ public void testListLatestOffsetsNullResponse() throws ExecutionException,
InterruptedException {
+ TopicPartition tp = new TopicPartition(TOPIC, PARTITION);
+ CompletableFuture<PartitionMetadataClient.OffsetResponse>
partitionFuture = new CompletableFuture<>();
+ Map<TopicPartition,
CompletableFuture<PartitionMetadataClient.OffsetResponse>> futures = Map.of(
+ tp,
+ partitionFuture
+ );
+ networkPartitionMetadataClient =
NetworkPartitionMetadataClientBuilder.builder().build();
+ // Pass null as clientResponse.
+ networkPartitionMetadataClient.handleResponse(futures, null);
+ assertTrue(partitionFuture.isDone() &&
!partitionFuture.isCompletedExceptionally());
+ PartitionMetadataClient.OffsetResponse response =
partitionFuture.get();
+ assertEquals(-1, response.offset());
+ assertEquals(Errors.UNKNOWN_SERVER_ERROR.code(),
response.error().code());
+ }
+
+ @Test
+ public void testListLatestOffsetsAuthenticationError() throws
ExecutionException, InterruptedException {
+ TopicPartition tp = new TopicPartition(TOPIC, PARTITION);
+ CompletableFuture<PartitionMetadataClient.OffsetResponse>
partitionFuture = new CompletableFuture<>();
+ Map<TopicPartition,
CompletableFuture<PartitionMetadataClient.OffsetResponse>> futures = Map.of(
+ tp,
+ partitionFuture
+ );
+ AuthenticationException authenticationException = new
AuthenticationException("Test authentication exception");
+ ClientResponse clientResponse = mock(ClientResponse.class);
+ // Mock authentication exception in client response.
+
when(clientResponse.authenticationException()).thenReturn(authenticationException);
+ networkPartitionMetadataClient =
NetworkPartitionMetadataClientBuilder.builder().build();
+ networkPartitionMetadataClient.handleResponse(futures, clientResponse);
+ assertTrue(partitionFuture.isDone() &&
!partitionFuture.isCompletedExceptionally());
+ PartitionMetadataClient.OffsetResponse response =
partitionFuture.get();
+ assertEquals(-1, response.offset());
+ assertEquals(Errors.UNKNOWN_SERVER_ERROR.code(),
response.error().code());
+ }
+
+ @Test
+ public void testListLatestOffsetsVersionMismatch() throws
ExecutionException, InterruptedException {
+ TopicPartition tp = new TopicPartition(TOPIC, PARTITION);
+ CompletableFuture<PartitionMetadataClient.OffsetResponse>
partitionFuture = new CompletableFuture<>();
+ Map<TopicPartition,
CompletableFuture<PartitionMetadataClient.OffsetResponse>> futures = Map.of(
+ tp,
+ partitionFuture
+ );
+ UnsupportedVersionException unsupportedVersionException = new
UnsupportedVersionException("Test unsupportedVersionException exception");
+ ClientResponse clientResponse = mock(ClientResponse.class);
+ when(clientResponse.authenticationException()).thenReturn(null);
+ // Mock version mismatch exception in client response.
+
when(clientResponse.versionMismatch()).thenReturn(unsupportedVersionException);
+ networkPartitionMetadataClient =
NetworkPartitionMetadataClientBuilder.builder().build();
+ networkPartitionMetadataClient.handleResponse(futures, clientResponse);
+ assertTrue(partitionFuture.isDone() &&
!partitionFuture.isCompletedExceptionally());
+ PartitionMetadataClient.OffsetResponse response =
partitionFuture.get();
+ assertEquals(-1, response.offset());
+ assertEquals(Errors.UNKNOWN_SERVER_ERROR.code(),
response.error().code());
+ }
+
+ @Test
+ public void testListLatestOffsetsDisconnected() throws ExecutionException,
InterruptedException {
+ TopicPartition tp = new TopicPartition(TOPIC, PARTITION);
+ CompletableFuture<PartitionMetadataClient.OffsetResponse>
partitionFuture = new CompletableFuture<>();
+ Map<TopicPartition,
CompletableFuture<PartitionMetadataClient.OffsetResponse>> futures = Map.of(
+ tp,
+ partitionFuture
+ );
+ ClientResponse clientResponse = mock(ClientResponse.class);
+ when(clientResponse.authenticationException()).thenReturn(null);
+ when(clientResponse.versionMismatch()).thenReturn(null);
+ // Mock disconnected in client response.
+ when(clientResponse.wasDisconnected()).thenReturn(true);
+ networkPartitionMetadataClient =
NetworkPartitionMetadataClientBuilder.builder().build();
+ networkPartitionMetadataClient.handleResponse(futures, clientResponse);
+ assertTrue(partitionFuture.isDone() &&
!partitionFuture.isCompletedExceptionally());
+ PartitionMetadataClient.OffsetResponse response =
partitionFuture.get();
+ assertEquals(-1, response.offset());
+ assertEquals(Errors.NETWORK_EXCEPTION.code(), response.error().code());
+ }
+
+ @Test
+ public void testListLatestOffsetsTimedOut() throws ExecutionException,
InterruptedException {
+ TopicPartition tp = new TopicPartition(TOPIC, PARTITION);
+ CompletableFuture<PartitionMetadataClient.OffsetResponse>
partitionFuture = new CompletableFuture<>();
+ Map<TopicPartition,
CompletableFuture<PartitionMetadataClient.OffsetResponse>> futures = Map.of(
+ tp,
+ partitionFuture
+ );
+ ClientResponse clientResponse = mock(ClientResponse.class);
+ when(clientResponse.authenticationException()).thenReturn(null);
+ when(clientResponse.versionMismatch()).thenReturn(null);
+ when(clientResponse.wasDisconnected()).thenReturn(false);
+ // Mock timed out in client response.
+ when(clientResponse.wasTimedOut()).thenReturn(true);
+ networkPartitionMetadataClient =
NetworkPartitionMetadataClientBuilder.builder().build();
+ networkPartitionMetadataClient.handleResponse(futures, clientResponse);
+ assertTrue(partitionFuture.isDone() &&
!partitionFuture.isCompletedExceptionally());
+ PartitionMetadataClient.OffsetResponse response =
partitionFuture.get();
+ assertEquals(-1, response.offset());
+ assertEquals(Errors.REQUEST_TIMED_OUT.code(), response.error().code());
+ }
+
+ @Test
+ public void testListLatestOffsetsMultiplePartitionsSameLeader() throws
ExecutionException, InterruptedException {
+ TopicPartition tp1 = new TopicPartition(TOPIC, PARTITION);
+ TopicPartition tp2 = new TopicPartition(TOPIC, 1);
+ long expectedOffset1 = 100L;
+ long expectedOffset2 = 200L;
+
+ MetadataCache metadataCache = mock(MetadataCache.class);
+ MockClient client = new MockClient(MOCK_TIME);
+
+ // Mock metadata cache to return same leader for both partitions
+ when(metadataCache.getPartitionLeaderEndpoint(TOPIC, 0, LISTENER_NAME))
+ .thenReturn(Optional.of(LEADER_NODE));
+ when(metadataCache.getPartitionLeaderEndpoint(TOPIC, 1, LISTENER_NAME))
+ .thenReturn(Optional.of(LEADER_NODE));
+
+ // Prepare response for ListOffsets request with both partitions
+ client.prepareResponseFrom(body -> {
+ if (body instanceof ListOffsetsRequest) {
+ ListOffsetsRequest request = (ListOffsetsRequest) body;
+ ListOffsetsTopic requestTopic = request.data().topics().get(0);
+ return requestTopic.name().equals(TOPIC) &&
+ requestTopic.partitions().size() == 2;
+ }
+ return false;
+ }, new ListOffsetsResponse(
+ new org.apache.kafka.common.message.ListOffsetsResponseData()
+ .setTopics(List.of(
+ new ListOffsetsTopicResponse()
+ .setName(TOPIC)
+ .setPartitions(List.of(
+ new ListOffsetsPartitionResponse()
+ .setPartitionIndex(0)
+ .setErrorCode(Errors.NONE.code())
+ .setOffset(expectedOffset1)
+ .setTimestamp(System.currentTimeMillis()),
+ new ListOffsetsPartitionResponse()
+ .setPartitionIndex(1)
+ .setErrorCode(Errors.NONE.code())
+ .setOffset(expectedOffset2)
+ .setTimestamp(System.currentTimeMillis())
+ ))
+ ))
+ ), LEADER_NODE);
+
+ networkPartitionMetadataClient =
NetworkPartitionMetadataClientBuilder.builder()
+ .withMetadataCache(metadataCache)
+ .withKafkaClientSupplier(() -> client)
+ .build();
+
+ Set<TopicPartition> partitions = new HashSet<>();
+ partitions.add(tp1);
+ partitions.add(tp2);
+
+ Map<TopicPartition,
CompletableFuture<PartitionMetadataClient.OffsetResponse>> futures =
+ networkPartitionMetadataClient.listLatestOffsets(partitions);
+
+ assertNotNull(futures);
+ assertEquals(2, futures.size());
+ assertTrue(futures.containsKey(tp1));
+ assertTrue(futures.containsKey(tp2));
+
+ PartitionMetadataClient.OffsetResponse response1 =
futures.get(tp1).get();
+ assertTrue(futures.get(tp1).isDone() &&
!futures.get(tp1).isCompletedExceptionally());
+ assertNotNull(response1);
+ assertEquals(Errors.NONE.code(), response1.error().code());
+ assertEquals(expectedOffset1, response1.offset());
+
+ PartitionMetadataClient.OffsetResponse response2 =
futures.get(tp2).get();
+ assertTrue(futures.get(tp2).isDone() &&
!futures.get(tp2).isCompletedExceptionally());
+ assertNotNull(response2);
+ assertEquals(Errors.NONE.code(), response2.error().code());
+ assertEquals(expectedOffset2, response2.offset());
+ }
+
+ @Test
+ public void testListLatestOffsetsMultiplePartitionsDifferentLeaders()
throws ExecutionException, InterruptedException {
+ TopicPartition tp1 = new TopicPartition(TOPIC, 0);
+ TopicPartition tp2 = new TopicPartition(TOPIC, 1);
+ Node leaderNode1 = LEADER_NODE;
+ Node leaderNode2 = new Node(2, HOST, PORT + 1);
+ long expectedOffset1 = 100L;
+ long expectedOffset2 = 200L;
+
+ MetadataCache metadataCache = mock(MetadataCache.class);
+ MockClient client = new MockClient(MOCK_TIME);
+
+ // Mock metadata cache to return different leaders
+ when(metadataCache.getPartitionLeaderEndpoint(TOPIC, 0, LISTENER_NAME))
+ .thenReturn(Optional.of(leaderNode1));
+ when(metadataCache.getPartitionLeaderEndpoint(TOPIC, 1, LISTENER_NAME))
+ .thenReturn(Optional.of(leaderNode2));
+
+ // Prepare response for first leader
+ client.prepareResponseFrom(body -> {
+ if (body instanceof ListOffsetsRequest) {
+ ListOffsetsRequest request = (ListOffsetsRequest) body;
+ ListOffsetsTopic requestTopic = request.data().topics().get(0);
+ return requestTopic.name().equals(TOPIC) &&
+ requestTopic.partitions().size() == 1 &&
+ requestTopic.partitions().get(0).partitionIndex() == 0;
+ }
+ return false;
+ }, new ListOffsetsResponse(
+ new org.apache.kafka.common.message.ListOffsetsResponseData()
+ .setTopics(List.of(
+ new ListOffsetsTopicResponse()
+ .setName(TOPIC)
+ .setPartitions(List.of(
+ new ListOffsetsPartitionResponse()
+ .setPartitionIndex(0)
+ .setErrorCode(Errors.NONE.code())
+ .setOffset(expectedOffset1)
+ .setTimestamp(System.currentTimeMillis())
+ ))
+ ))
+ ), leaderNode1);
+
+ // Prepare response for second leader
+ client.prepareResponseFrom(body -> {
+ if (body instanceof ListOffsetsRequest) {
+ ListOffsetsRequest request = (ListOffsetsRequest) body;
+ ListOffsetsTopic requestTopic = request.data().topics().get(0);
+ return requestTopic.name().equals(TOPIC) &&
+ requestTopic.partitions().size() == 1 &&
+ requestTopic.partitions().get(0).partitionIndex() == 1;
+ }
+ return false;
+ }, new ListOffsetsResponse(
+ new org.apache.kafka.common.message.ListOffsetsResponseData()
+ .setTopics(List.of(
+ new ListOffsetsTopicResponse()
+ .setName(TOPIC)
+ .setPartitions(List.of(
+ new ListOffsetsPartitionResponse()
+ .setPartitionIndex(1)
+ .setErrorCode(Errors.NONE.code())
+ .setOffset(expectedOffset2)
+ .setTimestamp(System.currentTimeMillis())
+ ))
+ ))
+ ), leaderNode2);
+
+ networkPartitionMetadataClient =
NetworkPartitionMetadataClientBuilder.builder()
+ .withMetadataCache(metadataCache)
+ .withKafkaClientSupplier(() -> client)
+ .build();
+
+ Set<TopicPartition> partitions = new HashSet<>();
+ partitions.add(tp1);
+ partitions.add(tp2);
+
+ Map<TopicPartition,
CompletableFuture<PartitionMetadataClient.OffsetResponse>> futures =
+ networkPartitionMetadataClient.listLatestOffsets(partitions);
+
+ assertNotNull(futures);
+ assertEquals(2, futures.size());
+ assertTrue(futures.containsKey(tp1));
+ assertTrue(futures.containsKey(tp2));
+
+ PartitionMetadataClient.OffsetResponse response1 =
futures.get(tp1).get();
+ assertTrue(futures.get(tp1).isDone() &&
!futures.get(tp1).isCompletedExceptionally());
+ assertNotNull(response1);
+ assertEquals(Errors.NONE.code(), response1.error().code());
+ assertEquals(expectedOffset1, response1.offset());
+
+ PartitionMetadataClient.OffsetResponse response2 =
futures.get(tp2).get();
+ assertTrue(futures.get(tp2).isDone() &&
!futures.get(tp2).isCompletedExceptionally());
+ assertNotNull(response2);
+ assertEquals(Errors.NONE.code(), response2.error().code());
+ assertEquals(expectedOffset2, response2.offset());
+ }
+
+ @Test
+ public void testListLatestOffsetsMultipleTopics() throws
ExecutionException, InterruptedException {
+ String topic1 = TOPIC;
+ String topic2 = "test-topic-2";
+ TopicPartition tp1 = new TopicPartition(topic1, 0);
+ TopicPartition tp2 = new TopicPartition(topic2, 0);
+ long expectedOffset1 = 100L;
+ long expectedOffset2 = 200L;
+
+ MetadataCache metadataCache = mock(MetadataCache.class);
+ MockClient client = new MockClient(MOCK_TIME);
+
+ // Mock metadata cache to return same leader for both topics
+ when(metadataCache.getPartitionLeaderEndpoint(topic1, 0,
LISTENER_NAME))
+ .thenReturn(Optional.of(LEADER_NODE));
+ when(metadataCache.getPartitionLeaderEndpoint(topic2, 0,
LISTENER_NAME))
+ .thenReturn(Optional.of(LEADER_NODE));
+
+ // Prepare response for ListOffsets request with both topics
+ client.prepareResponseFrom(body -> {
+ if (body instanceof ListOffsetsRequest) {
+ ListOffsetsRequest request = (ListOffsetsRequest) body;
+ return request.data().topics().size() == 2;
+ }
+ return false;
+ }, new ListOffsetsResponse(
+ new org.apache.kafka.common.message.ListOffsetsResponseData()
+ .setTopics(List.of(
+ new ListOffsetsTopicResponse()
+ .setName(topic1)
+ .setPartitions(List.of(
+ new ListOffsetsPartitionResponse()
+ .setPartitionIndex(0)
+ .setErrorCode(Errors.NONE.code())
+ .setOffset(expectedOffset1)
+ .setTimestamp(System.currentTimeMillis())
+ )),
+ new ListOffsetsTopicResponse()
+ .setName(topic2)
+ .setPartitions(List.of(
+ new ListOffsetsPartitionResponse()
+ .setPartitionIndex(0)
+ .setErrorCode(Errors.NONE.code())
+ .setOffset(expectedOffset2)
+ .setTimestamp(System.currentTimeMillis())
+ ))
+ ))
+ ), LEADER_NODE);
+
+ networkPartitionMetadataClient =
NetworkPartitionMetadataClientBuilder.builder()
+ .withMetadataCache(metadataCache)
+ .withKafkaClientSupplier(() -> client)
+ .build();
+
+ Set<TopicPartition> partitions = new HashSet<>();
+ partitions.add(tp1);
+ partitions.add(tp2);
+
+ Map<TopicPartition,
CompletableFuture<PartitionMetadataClient.OffsetResponse>> futures =
+ networkPartitionMetadataClient.listLatestOffsets(partitions);
+
+ assertNotNull(futures);
+ assertEquals(2, futures.size());
+ assertTrue(futures.containsKey(tp1));
+ assertTrue(futures.containsKey(tp2));
+
+ PartitionMetadataClient.OffsetResponse response1 =
futures.get(tp1).get();
+ assertTrue(futures.get(tp1).isDone() &&
!futures.get(tp1).isCompletedExceptionally());
+ assertNotNull(response1);
+ assertEquals(Errors.NONE.code(), response1.error().code());
+ assertEquals(expectedOffset1, response1.offset());
+
+ PartitionMetadataClient.OffsetResponse response2 =
futures.get(tp2).get();
+ assertTrue(futures.get(tp2).isDone() &&
!futures.get(tp2).isCompletedExceptionally());
+ assertNotNull(response2);
+ assertEquals(Errors.NONE.code(), response2.error().code());
+ assertEquals(expectedOffset2, response2.offset());
+ }
+
+ @Test
+ public void testListLatestOffsetsNullPartitions() {
+ networkPartitionMetadataClient =
NetworkPartitionMetadataClientBuilder.builder().build();
+
+ Map<TopicPartition,
CompletableFuture<PartitionMetadataClient.OffsetResponse>> futures =
+ networkPartitionMetadataClient.listLatestOffsets(null);
+
+ assertNotNull(futures);
+ assertTrue(futures.isEmpty());
+ }
+
+ @Test
+ public void testListLatestOffsetsEmptyPartitions() {
+ networkPartitionMetadataClient =
NetworkPartitionMetadataClientBuilder.builder().build();
+
+ Set<TopicPartition> partitions = new HashSet<>();
+
+ Map<TopicPartition,
CompletableFuture<PartitionMetadataClient.OffsetResponse>> futures =
+ networkPartitionMetadataClient.listLatestOffsets(partitions);
+
+ assertNotNull(futures);
+ assertTrue(futures.isEmpty());
+ }
+
+ @Test
+ public void testListLatestOffsetsServerError() throws ExecutionException,
InterruptedException {
+ TopicPartition tp = new TopicPartition(TOPIC, PARTITION);
+ MetadataCache metadataCache = mock(MetadataCache.class);
+ MockClient client = new MockClient(MOCK_TIME);
+
+ // Mock metadata cache to return leader node
+ when(metadataCache.getPartitionLeaderEndpoint(TOPIC, PARTITION,
LISTENER_NAME))
+ .thenReturn(Optional.of(LEADER_NODE));
+
+ // Prepare error response
+ client.prepareResponseFrom(body -> {
+ if (body instanceof ListOffsetsRequest) {
+ ListOffsetsRequest request = (ListOffsetsRequest) body;
+ ListOffsetsTopic requestTopic = request.data().topics().get(0);
+ return requestTopic.name().equals(TOPIC) &&
+ requestTopic.partitions().get(0).partitionIndex() ==
PARTITION;
+ }
+ return false;
+ }, new ListOffsetsResponse(
+ new org.apache.kafka.common.message.ListOffsetsResponseData()
+ .setTopics(List.of(
+ new ListOffsetsTopicResponse()
+ .setName(TOPIC)
+ .setPartitions(List.of(
+ new ListOffsetsPartitionResponse()
+ .setPartitionIndex(PARTITION)
+
.setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code())
+ ))
+ ))
+ ), LEADER_NODE);
+
+ networkPartitionMetadataClient =
NetworkPartitionMetadataClientBuilder.builder()
+ .withMetadataCache(metadataCache)
+ .withKafkaClientSupplier(() -> client)
+ .build();
+
+ Set<TopicPartition> partitions = new HashSet<>();
+ partitions.add(tp);
+
+ Map<TopicPartition,
CompletableFuture<PartitionMetadataClient.OffsetResponse>> futures =
+ networkPartitionMetadataClient.listLatestOffsets(partitions);
+
+ assertNotNull(futures);
+ assertEquals(1, futures.size());
+ assertTrue(futures.containsKey(tp));
+
+ PartitionMetadataClient.OffsetResponse response =
futures.get(tp).get();
+ assertTrue(futures.get(tp).isDone() &&
!futures.get(tp).isCompletedExceptionally());
+ assertNotNull(response);
+ assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION.code(),
response.error().code());
+ }
+
+ @Test
+ public void testListLatestOffsetsMissingPartitionInResponse() throws
ExecutionException, InterruptedException {
+ TopicPartition tp = new TopicPartition(TOPIC, PARTITION);
+ MetadataCache metadataCache = mock(MetadataCache.class);
+ MockClient client = new MockClient(MOCK_TIME);
+
+ // Mock metadata cache to return leader node
+ when(metadataCache.getPartitionLeaderEndpoint(TOPIC, PARTITION,
LISTENER_NAME))
+ .thenReturn(Optional.of(LEADER_NODE));
+
+ // Prepare response without the requested partition
+ client.prepareResponseFrom(body -> {
+ if (body instanceof ListOffsetsRequest) {
+ ListOffsetsRequest request = (ListOffsetsRequest) body;
+ ListOffsetsTopic requestTopic = request.data().topics().get(0);
+ return requestTopic.name().equals(TOPIC) &&
+ requestTopic.partitions().get(0).partitionIndex() ==
PARTITION;
+ }
+ return false;
+ }, new ListOffsetsResponse(
+ new org.apache.kafka.common.message.ListOffsetsResponseData()
+ .setTopics(List.of(
+ new ListOffsetsTopicResponse()
+ .setName(TOPIC)
+ .setPartitions(List.of())
+ ))
+ ), LEADER_NODE);
+
+ networkPartitionMetadataClient =
NetworkPartitionMetadataClientBuilder.builder()
+ .withMetadataCache(metadataCache)
+ .withKafkaClientSupplier(() -> client)
+ .build();
+
+ Set<TopicPartition> partitions = new HashSet<>();
+ partitions.add(tp);
+
+ Map<TopicPartition,
CompletableFuture<PartitionMetadataClient.OffsetResponse>> futures =
+ networkPartitionMetadataClient.listLatestOffsets(partitions);
+
+ assertNotNull(futures);
+ assertEquals(1, futures.size());
+ assertTrue(futures.containsKey(tp));
+
+ PartitionMetadataClient.OffsetResponse response =
futures.get(tp).get();
+ assertTrue(futures.get(tp).isDone() &&
!futures.get(tp).isCompletedExceptionally());
+ assertNotNull(response);
+ assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION.code(),
response.error().code());
+ }
+
+ @Test
+ public void testClose() {
+ KafkaClient client = mock(KafkaClient.class);
+ networkPartitionMetadataClient =
NetworkPartitionMetadataClientBuilder.builder()
+ .withKafkaClientSupplier(() -> client)
+ .build();
+ try {
+ verify(client, times(0)).close();
+ // Ensure send thread is initialized.
+ networkPartitionMetadataClient.ensureSendThreadInitialized();
+ networkPartitionMetadataClient.close();
+ // KafkaClient is closed when NetworkPartitionMetadataClient is
closed.
+ verify(client, times(1)).close();
+ } catch (Exception e) {
+ fail("unexpected exception", e);
+ }
+ }
+
+ @Test
+ public void testLazyInitialization() {
+ TopicPartition tp = new TopicPartition(TOPIC, PARTITION);
+ long expectedOffset = 100L;
+ MetadataCache metadataCache = mock(MetadataCache.class);
+ MockClient client = new MockClient(MOCK_TIME);
+
+ // Track if client supplier was called
+ final boolean[] supplierCalled = {false};
+ Supplier<KafkaClient> kafkaClientSupplier = () -> {
+ supplierCalled[0] = true;
+ return client;
+ };
+
+ // Mock metadata cache to return leader node
+ when(metadataCache.getPartitionLeaderEndpoint(TOPIC, PARTITION,
LISTENER_NAME))
+ .thenReturn(Optional.of(LEADER_NODE));
+
+ // Prepare response for ListOffsets request
+ client.prepareResponseFrom(body -> {
+ if (body instanceof ListOffsetsRequest request) {
+ ListOffsetsTopic requestTopic = request.data().topics().get(0);
+ return requestTopic.name().equals(TOPIC) &&
+ requestTopic.partitions().get(0).partitionIndex() ==
PARTITION;
+ }
+ return false;
+ }, new ListOffsetsResponse(
+ new org.apache.kafka.common.message.ListOffsetsResponseData()
+ .setTopics(List.of(
+ new ListOffsetsTopicResponse()
+ .setName(TOPIC)
+ .setPartitions(List.of(
+ new ListOffsetsPartitionResponse()
+ .setPartitionIndex(PARTITION)
+ .setErrorCode(Errors.NONE.code())
+ .setOffset(expectedOffset)
+ .setTimestamp(System.currentTimeMillis())
+ ))
+ ))
+ ), LEADER_NODE);
+
+ networkPartitionMetadataClient =
NetworkPartitionMetadataClientBuilder.builder()
+ .withMetadataCache(metadataCache)
+ .withKafkaClientSupplier(kafkaClientSupplier)
+ .build();
+
+ // Verify supplier was not called before listLatestOffsets
+ assertFalse(supplierCalled[0]);
+
+ Set<TopicPartition> partitions = new HashSet<>();
+ partitions.add(tp);
+
+ networkPartitionMetadataClient.listLatestOffsets(partitions);
+
+ // Verify supplier was called during listLatestOffsets
+ assertTrue(supplierCalled[0]);
+ }
+
+ @Test
+ public void testCloseWithoutInitialization() throws IOException {
+ KafkaClient client = mock(KafkaClient.class);
+ final boolean[] supplierCalled = {false};
+ Supplier<KafkaClient> clientSupplier = () -> {
+ supplierCalled[0] = true;
+ return client;
+ };
+
+ networkPartitionMetadataClient =
NetworkPartitionMetadataClientBuilder.builder()
+ .withKafkaClientSupplier(clientSupplier)
+ .build();
+
+ // Close without calling listLatestOffsets
+ networkPartitionMetadataClient.close();
+
+ // Verify supplier was never called
+ assertFalse(supplierCalled[0]);
+ // Verify client.close() was never called since sendThread was never
initialized
+ verify(client, never()).close();
+ }
+
+ @Test
+ public void testLazyInitializationWithEmptyPartitions() {
+ MetadataCache metadataCache = mock(MetadataCache.class);
+ final boolean[] supplierCalled = {false};
+ Supplier<KafkaClient> clientSupplier = () -> {
+ supplierCalled[0] = true;
+ return mock(KafkaClient.class);
+ };
+
+ networkPartitionMetadataClient =
NetworkPartitionMetadataClientBuilder.builder()
+ .withMetadataCache(metadataCache)
+ .withKafkaClientSupplier(clientSupplier)
+ .build();
+
+ // Call listLatestOffsets with empty partitions
+ networkPartitionMetadataClient.listLatestOffsets(new HashSet<>());
+
+ // Verify supplier was not called since no partitions were provided.
+ assertFalse(supplierCalled[0]);
+ }
+
+ @Test
+ public void testLazyInitializationWithNullPartitions() {
+ MetadataCache metadataCache = mock(MetadataCache.class);
+ final boolean[] supplierCalled = {false};
+ Supplier<KafkaClient> clientSupplier = () -> {
+ supplierCalled[0] = true;
+ return mock(KafkaClient.class);
+ };
+
+ networkPartitionMetadataClient =
NetworkPartitionMetadataClientBuilder.builder()
+ .withMetadataCache(metadataCache)
+ .withKafkaClientSupplier(clientSupplier)
+ .build();
+
+ // Call listLatestOffsets with null partitions
+ networkPartitionMetadataClient.listLatestOffsets(null);
+
+ // Verify supplier was not called since no partitions were provided.
+ assertFalse(supplierCalled[0]);
+ }
+
+ @Test
+ public void testLazyInitializationOnlyOnce() {
+ TopicPartition tp = new TopicPartition(TOPIC, PARTITION);
+ long expectedOffset = 100L;
+ MetadataCache metadataCache = mock(MetadataCache.class);
+ MockClient client = new MockClient(MOCK_TIME);
+
+ // Track how many times supplier was called
+ final int[] supplierCallCount = {0};
+ Supplier<KafkaClient> clientSupplier = () -> {
+ supplierCallCount[0]++;
+ return client;
+ };
+
+ // Mock metadata cache to return leader node
+ when(metadataCache.getPartitionLeaderEndpoint(TOPIC, PARTITION,
LISTENER_NAME))
+ .thenReturn(Optional.of(LEADER_NODE));
+
+ // Prepare multiple responses for multiple calls
+ client.prepareResponseFrom(body -> {
+ if (body instanceof ListOffsetsRequest request) {
+ ListOffsetsTopic requestTopic = request.data().topics().get(0);
+ return requestTopic.name().equals(TOPIC) &&
+ requestTopic.partitions().get(0).partitionIndex() ==
PARTITION;
+ }
+ return false;
+ }, new ListOffsetsResponse(
+ new org.apache.kafka.common.message.ListOffsetsResponseData()
+ .setTopics(List.of(
+ new ListOffsetsTopicResponse()
+ .setName(TOPIC)
+ .setPartitions(List.of(
+ new ListOffsetsPartitionResponse()
+ .setPartitionIndex(PARTITION)
+ .setErrorCode(Errors.NONE.code())
+ .setOffset(expectedOffset)
+ .setTimestamp(System.currentTimeMillis())
+ ))
+ ))
+ ), LEADER_NODE);
+
+ client.prepareResponseFrom(body -> {
+ if (body instanceof ListOffsetsRequest request) {
+ ListOffsetsTopic requestTopic = request.data().topics().get(0);
+ return requestTopic.name().equals(TOPIC) &&
+ requestTopic.partitions().get(0).partitionIndex() ==
PARTITION;
+ }
+ return false;
+ }, new ListOffsetsResponse(
+ new org.apache.kafka.common.message.ListOffsetsResponseData()
+ .setTopics(List.of(
+ new ListOffsetsTopicResponse()
+ .setName(TOPIC)
+ .setPartitions(List.of(
+ new ListOffsetsPartitionResponse()
+ .setPartitionIndex(PARTITION)
+ .setErrorCode(Errors.NONE.code())
+ .setOffset(expectedOffset + 1)
+ .setTimestamp(System.currentTimeMillis())
+ ))
+ ))
+ ), LEADER_NODE);
+
+ networkPartitionMetadataClient =
NetworkPartitionMetadataClientBuilder.builder()
+ .withMetadataCache(metadataCache)
+ .withKafkaClientSupplier(clientSupplier)
+ .build();
+
+ Set<TopicPartition> partitions = new HashSet<>();
+ partitions.add(tp);
+
+ // First call to listLatestOffsets
+ networkPartitionMetadataClient.listLatestOffsets(partitions);
+
+ // Verify supplier was called once
+ assertEquals(1, supplierCallCount[0]);
+
+ // Second call to listLatestOffsets
+ networkPartitionMetadataClient.listLatestOffsets(partitions);
+
+ // Verify supplier was still only called once (not again)
+ assertEquals(1, supplierCallCount[0]);
+ }
+}