This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new a592912ec94 KAFKA-17663 Add metadata caching in
PartitionLeaderStrategy (#17367)
a592912ec94 is described below
commit a592912ec94b021a2bd7485abb9ebda02dc01766
Author: Andrew Schofield <[email protected]>
AuthorDate: Mon Nov 18 06:45:06 2024 +0000
KAFKA-17663 Add metadata caching in PartitionLeaderStrategy (#17367)
Admin API operations have two phases: lookup and fulfilment. The lookup
phase involves a METADATA request whose details depend upon the operation being
performed.
For some operations, the METADATA request can be quite expensive to serve.
For example, if the user calls Admin.listOffsets for 1000 topics, the METADATA
request will include all 1000 topics and the response will contain the leader
information for all of these topics. And then the actual fulfilment phase does
the real work of the operation.
In cases where a long-running application is performing repeated admin
operations which need the same metadata information about partition leadership,
it is not necessary to send the METADATA request for every single admin
operation.
This PR adds a cache of the mapping from topic-partition to leader id to
the admin client. The cache doesn't need to be very sophisticated because the
admin client will retry if the information becomes stale, and the cache can be
updated as a result of the retry.
Reviewers: Chia-Ping Tsai <[email protected]>
---
.../kafka/clients/admin/KafkaAdminClient.java | 18 +-
.../admin/internals/AbortTransactionHandler.java | 8 +-
.../clients/admin/internals/AdminApiDriver.java | 8 +-
.../clients/admin/internals/AdminApiFuture.java | 20 +
.../admin/internals/DeleteRecordsHandler.java | 8 +-
.../admin/internals/DescribeProducersHandler.java | 7 +-
.../admin/internals/ListOffsetsHandler.java | 8 +-
.../admin/internals/PartitionLeaderStrategy.java | 92 ++++
.../PartitionLeaderStrategyIntegrationTest.java | 491 +++++++++++++++++++++
9 files changed, 638 insertions(+), 22 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index 7e8f42091fc..1b6d3efc8e1 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -58,6 +58,7 @@ import
org.apache.kafka.clients.admin.internals.FenceProducersHandler;
import
org.apache.kafka.clients.admin.internals.ListConsumerGroupOffsetsHandler;
import org.apache.kafka.clients.admin.internals.ListOffsetsHandler;
import org.apache.kafka.clients.admin.internals.ListTransactionsHandler;
+import org.apache.kafka.clients.admin.internals.PartitionLeaderStrategy;
import
org.apache.kafka.clients.admin.internals.RemoveMembersFromConsumerGroupHandler;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
@@ -408,6 +409,7 @@ public class KafkaAdminClient extends AdminClient {
private final ExponentialBackoff retryBackoff;
private final long rebootstrapTriggerMs;
private final MetadataRecoveryStrategy metadataRecoveryStrategy;
+ private final Map<TopicPartition, Integer> partitionLeaderCache;
private final AdminFetchMetricsManager adminFetchMetricsManager;
private final Optional<ClientTelemetryReporter> clientTelemetryReporter;
@@ -635,6 +637,7 @@ public class KafkaAdminClient extends AdminClient {
this.clientTelemetryReporter.ifPresent(reporters::add);
this.rebootstrapTriggerMs =
config.getLong(AdminClientConfig.METADATA_RECOVERY_REBOOTSTRAP_TRIGGER_MS_CONFIG);
this.metadataRecoveryStrategy =
MetadataRecoveryStrategy.forName(config.getString(AdminClientConfig.METADATA_RECOVERY_STRATEGY_CONFIG));
+ this.partitionLeaderCache = new HashMap<>();
this.adminFetchMetricsManager = new AdminFetchMetricsManager(metrics);
config.logUnused();
AppInfoParser.registerAppInfo(JMX_PREFIX, clientId, metrics,
time.milliseconds());
@@ -3359,7 +3362,8 @@ public class KafkaAdminClient extends AdminClient {
@Override
public DeleteRecordsResult deleteRecords(final Map<TopicPartition,
RecordsToDelete> recordsToDelete,
final DeleteRecordsOptions
options) {
- SimpleAdminApiFuture<TopicPartition, DeletedRecords> future =
DeleteRecordsHandler.newFuture(recordsToDelete.keySet());
+ PartitionLeaderStrategy.PartitionLeaderFuture<DeletedRecords> future =
+ DeleteRecordsHandler.newFuture(recordsToDelete.keySet(),
partitionLeaderCache);
int timeoutMs = defaultApiTimeoutMs;
if (options.timeoutMs() != null) {
timeoutMs = options.timeoutMs();
@@ -4372,8 +4376,8 @@ public class KafkaAdminClient extends AdminClient {
@Override
public ListOffsetsResult listOffsets(Map<TopicPartition, OffsetSpec>
topicPartitionOffsets,
ListOffsetsOptions options) {
- AdminApiFuture.SimpleAdminApiFuture<TopicPartition,
ListOffsetsResultInfo> future =
- ListOffsetsHandler.newFuture(topicPartitionOffsets.keySet());
+ PartitionLeaderStrategy.PartitionLeaderFuture<ListOffsetsResultInfo>
future =
+ ListOffsetsHandler.newFuture(topicPartitionOffsets.keySet(),
partitionLeaderCache);
Map<TopicPartition, Long> offsetQueriesByPartition =
topicPartitionOffsets.entrySet().stream()
.collect(Collectors.toMap(Map.Entry::getKey, e ->
getOffsetFromSpec(e.getValue())));
ListOffsetsHandler handler = new
ListOffsetsHandler(offsetQueriesByPartition, options, logContext,
defaultApiTimeoutMs);
@@ -4918,8 +4922,8 @@ public class KafkaAdminClient extends AdminClient {
@Override
public DescribeProducersResult
describeProducers(Collection<TopicPartition> topicPartitions,
DescribeProducersOptions options) {
- AdminApiFuture.SimpleAdminApiFuture<TopicPartition,
DescribeProducersResult.PartitionProducerState> future =
- DescribeProducersHandler.newFuture(topicPartitions);
+
PartitionLeaderStrategy.PartitionLeaderFuture<DescribeProducersResult.PartitionProducerState>
future =
+ DescribeProducersHandler.newFuture(topicPartitions,
partitionLeaderCache);
DescribeProducersHandler handler = new
DescribeProducersHandler(options, logContext);
invokeDriver(handler, future, options.timeoutMs);
return new DescribeProducersResult(future.all());
@@ -4936,8 +4940,8 @@ public class KafkaAdminClient extends AdminClient {
@Override
public AbortTransactionResult abortTransaction(AbortTransactionSpec spec,
AbortTransactionOptions options) {
- AdminApiFuture.SimpleAdminApiFuture<TopicPartition, Void> future =
-
AbortTransactionHandler.newFuture(Collections.singleton(spec.topicPartition()));
+ PartitionLeaderStrategy.PartitionLeaderFuture<Void> future =
+
AbortTransactionHandler.newFuture(Collections.singleton(spec.topicPartition()),
partitionLeaderCache);
AbortTransactionHandler handler = new AbortTransactionHandler(spec,
logContext);
invokeDriver(handler, future, options.timeoutMs);
return new AbortTransactionResult(future.all());
diff --git
a/clients/src/main/java/org/apache/kafka/clients/admin/internals/AbortTransactionHandler.java
b/clients/src/main/java/org/apache/kafka/clients/admin/internals/AbortTransactionHandler.java
index 0f5f4781080..f0b6d28be6b 100644
---
a/clients/src/main/java/org/apache/kafka/clients/admin/internals/AbortTransactionHandler.java
+++
b/clients/src/main/java/org/apache/kafka/clients/admin/internals/AbortTransactionHandler.java
@@ -34,6 +34,7 @@ import org.apache.kafka.common.utils.LogContext;
import org.slf4j.Logger;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import static java.util.Collections.singleton;
@@ -53,10 +54,11 @@ public class AbortTransactionHandler extends
AdminApiHandler.Batched<TopicPartit
this.lookupStrategy = new PartitionLeaderStrategy(logContext);
}
- public static AdminApiFuture.SimpleAdminApiFuture<TopicPartition, Void>
newFuture(
- Set<TopicPartition> topicPartitions
+ public static PartitionLeaderStrategy.PartitionLeaderFuture<Void>
newFuture(
+ Set<TopicPartition> topicPartitions,
+ Map<TopicPartition, Integer> partitionLeaderCache
) {
- return AdminApiFuture.forKeys(topicPartitions);
+ return new
PartitionLeaderStrategy.PartitionLeaderFuture<>(topicPartitions,
partitionLeaderCache);
}
@Override
diff --git
a/clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminApiDriver.java
b/clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminApiDriver.java
index 92e724e74f5..19e3f13ebef 100644
---
a/clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminApiDriver.java
+++
b/clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminApiDriver.java
@@ -110,7 +110,13 @@ public class AdminApiDriver<K, V> {
retryBackoffMaxMs,
CommonClientConfigs.RETRY_BACKOFF_JITTER);
this.log = logContext.logger(AdminApiDriver.class);
- retryLookup(future.lookupKeys());
+
+ // For any lookup keys for which we do not have cached information, we
will need to look up
+ // metadata. For all cached keys, they can proceed straight to the
fulfillment map.
+ // Note that the cache is only used on the initial calls, and any
errors that result
+ // in additional lookups use the full set of lookup keys.
+ retryLookup(future.uncachedLookupKeys());
+ future.cachedKeyBrokerIdMapping().forEach((key, brokerId) ->
fulfillmentMap.put(new FulfillmentScope(brokerId), key));
}
/**
diff --git
a/clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminApiFuture.java
b/clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminApiFuture.java
index b0294d86166..322d116a3df 100644
---
a/clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminApiFuture.java
+++
b/clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminApiFuture.java
@@ -19,6 +19,7 @@ package org.apache.kafka.clients.admin.internals;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.internals.KafkaFutureImpl;
+import java.util.Collections;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
@@ -37,6 +38,25 @@ public interface AdminApiFuture<K, V> {
*/
Set<K> lookupKeys();
+ /**
+ * The set of request keys that do not have cached key-broker id mappings.
If there
+ * is no cached key mapping, this will be the same as the lookup keys.
+ * Can be empty, but only if the cached key mapping is not empty.
+ */
+ default Set<K> uncachedLookupKeys() {
+ return lookupKeys();
+ }
+
+ /**
+ * The cached key-broker id mapping. For lookup strategies that do not
make use of a
+ * cache of metadata, this will be empty.
+ *
+ * @return mapping of keys to broker ids
+ */
+ default Map<K, Integer> cachedKeyBrokerIdMapping() {
+ return Collections.emptyMap();
+ }
+
/**
* Complete the futures associated with the given keys.
*
diff --git
a/clients/src/main/java/org/apache/kafka/clients/admin/internals/DeleteRecordsHandler.java
b/clients/src/main/java/org/apache/kafka/clients/admin/internals/DeleteRecordsHandler.java
index 836f2bc2a59..4afef617cb2 100644
---
a/clients/src/main/java/org/apache/kafka/clients/admin/internals/DeleteRecordsHandler.java
+++
b/clients/src/main/java/org/apache/kafka/clients/admin/internals/DeleteRecordsHandler.java
@@ -18,7 +18,6 @@ package org.apache.kafka.clients.admin.internals;
import org.apache.kafka.clients.admin.DeletedRecords;
import org.apache.kafka.clients.admin.RecordsToDelete;
-import
org.apache.kafka.clients.admin.internals.AdminApiFuture.SimpleAdminApiFuture;
import org.apache.kafka.clients.admin.internals.AdminApiHandler.Batched;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
@@ -72,10 +71,11 @@ public final class DeleteRecordsHandler extends
Batched<TopicPartition, DeletedR
return this.lookupStrategy;
}
- public static SimpleAdminApiFuture<TopicPartition, DeletedRecords>
newFuture(
- Collection<TopicPartition> topicPartitions
+ public static
PartitionLeaderStrategy.PartitionLeaderFuture<DeletedRecords> newFuture(
+ Collection<TopicPartition> topicPartitions,
+ Map<TopicPartition, Integer> partitionLeaderCache
) {
- return AdminApiFuture.forKeys(new HashSet<>(topicPartitions));
+ return new PartitionLeaderStrategy.PartitionLeaderFuture<>(new
HashSet<>(topicPartitions), partitionLeaderCache);
}
@Override
diff --git
a/clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeProducersHandler.java
b/clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeProducersHandler.java
index e4b203545bd..84338feb9e4 100644
---
a/clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeProducersHandler.java
+++
b/clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeProducersHandler.java
@@ -66,10 +66,11 @@ public class DescribeProducersHandler extends
AdminApiHandler.Batched<TopicParti
}
}
- public static AdminApiFuture.SimpleAdminApiFuture<TopicPartition,
PartitionProducerState> newFuture(
- Collection<TopicPartition> topicPartitions
+ public static
PartitionLeaderStrategy.PartitionLeaderFuture<PartitionProducerState> newFuture(
+ Collection<TopicPartition> topicPartitions,
+ Map<TopicPartition, Integer> partitionLeaderCache
) {
- return AdminApiFuture.forKeys(new HashSet<>(topicPartitions));
+ return new PartitionLeaderStrategy.PartitionLeaderFuture<>(new
HashSet<>(topicPartitions), partitionLeaderCache);
}
@Override
diff --git
a/clients/src/main/java/org/apache/kafka/clients/admin/internals/ListOffsetsHandler.java
b/clients/src/main/java/org/apache/kafka/clients/admin/internals/ListOffsetsHandler.java
index e36330a9924..f7c495d7fd8 100644
---
a/clients/src/main/java/org/apache/kafka/clients/admin/internals/ListOffsetsHandler.java
+++
b/clients/src/main/java/org/apache/kafka/clients/admin/internals/ListOffsetsHandler.java
@@ -18,7 +18,6 @@ package org.apache.kafka.clients.admin.internals;
import org.apache.kafka.clients.admin.ListOffsetsOptions;
import org.apache.kafka.clients.admin.ListOffsetsResult.ListOffsetsResultInfo;
-import
org.apache.kafka.clients.admin.internals.AdminApiFuture.SimpleAdminApiFuture;
import org.apache.kafka.clients.admin.internals.AdminApiHandler.Batched;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
@@ -217,9 +216,10 @@ public final class ListOffsetsHandler extends
Batched<TopicPartition, ListOffset
}
}
- public static SimpleAdminApiFuture<TopicPartition, ListOffsetsResultInfo>
newFuture(
- Collection<TopicPartition> topicPartitions
+ public static
PartitionLeaderStrategy.PartitionLeaderFuture<ListOffsetsResultInfo> newFuture(
+ Collection<TopicPartition> topicPartitions,
+ Map<TopicPartition, Integer> partitionLeaderCache
) {
- return AdminApiFuture.forKeys(new HashSet<>(topicPartitions));
+ return new PartitionLeaderStrategy.PartitionLeaderFuture<>(new
HashSet<>(topicPartitions), partitionLeaderCache);
}
}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/admin/internals/PartitionLeaderStrategy.java
b/clients/src/main/java/org/apache/kafka/clients/admin/internals/PartitionLeaderStrategy.java
index 9d52327b3c4..ff7dff2db8e 100644
---
a/clients/src/main/java/org/apache/kafka/clients/admin/internals/PartitionLeaderStrategy.java
+++
b/clients/src/main/java/org/apache/kafka/clients/admin/internals/PartitionLeaderStrategy.java
@@ -16,9 +16,11 @@
*/
package org.apache.kafka.clients.admin.internals;
+import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.InvalidTopicException;
import org.apache.kafka.common.errors.TopicAuthorizationException;
+import org.apache.kafka.common.internals.KafkaFutureImpl;
import org.apache.kafka.common.message.MetadataRequestData;
import org.apache.kafka.common.message.MetadataResponseData;
import org.apache.kafka.common.protocol.Errors;
@@ -31,9 +33,11 @@ import org.slf4j.Logger;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
+import java.util.stream.Collectors;
/**
* Base driver implementation for APIs which target partition leaders.
@@ -195,4 +199,92 @@ public class PartitionLeaderStrategy implements
AdminApiLookupStrategy<TopicPart
return new LookupResult<>(failed, mapped);
}
+ /**
+ * This subclass of {@link AdminApiFuture} starts with a pre-fetched map
for keys to broker ids which can be
+ * used to optimise the request. The map is kept up to date as metadata is
fetching as this request is processed.
+ * This is useful for situations in which {@link PartitionLeaderStrategy}
is used
+ * repeatedly, such as a sequence of identical calls to
+ * {@link org.apache.kafka.clients.admin.Admin#listOffsets(Map,
org.apache.kafka.clients.admin.ListOffsetsOptions)}.
+ */
+ public static class PartitionLeaderFuture<V> implements
AdminApiFuture<TopicPartition, V> {
+ private final Set<TopicPartition> requestKeys;
+ private final Map<TopicPartition, Integer> partitionLeaderCache;
+ private final Map<TopicPartition, KafkaFuture<V>> futures;
+
+ public PartitionLeaderFuture(Set<TopicPartition> requestKeys,
Map<TopicPartition, Integer> partitionLeaderCache) {
+ this.requestKeys = requestKeys;
+ this.partitionLeaderCache = partitionLeaderCache;
+ this.futures =
requestKeys.stream().collect(Collectors.toUnmodifiableMap(
+ Function.identity(),
+ k -> new KafkaFutureImpl<>()
+ ));
+ }
+
+ @Override
+ public Set<TopicPartition> lookupKeys() {
+ return futures.keySet();
+ }
+
+ @Override
+ public Set<TopicPartition> uncachedLookupKeys() {
+ Set<TopicPartition> keys = new HashSet<>();
+ requestKeys.forEach(tp -> {
+ if (!partitionLeaderCache.containsKey(tp)) {
+ keys.add(tp);
+ }
+ });
+ return keys;
+ }
+
+ @Override
+ public Map<TopicPartition, Integer> cachedKeyBrokerIdMapping() {
+ Map<TopicPartition, Integer> mapping = new HashMap<>();
+ requestKeys.forEach(tp -> {
+ Integer brokerId = partitionLeaderCache.get(tp);
+ if (brokerId != null) {
+ mapping.put(tp, brokerId);
+ }
+ });
+ return mapping;
+ }
+
+ public Map<TopicPartition, KafkaFuture<V>> all() {
+ return futures;
+ }
+
+ @Override
+ public void complete(Map<TopicPartition, V> values) {
+ values.forEach(this::complete);
+ }
+
+ private void complete(TopicPartition key, V value) {
+ futureOrThrow(key).complete(value);
+ }
+
+ @Override
+ public void completeLookup(Map<TopicPartition, Integer>
brokerIdMapping) {
+ partitionLeaderCache.putAll(brokerIdMapping);
+ }
+
+ @Override
+ public void completeExceptionally(Map<TopicPartition, Throwable>
errors) {
+ errors.forEach(this::completeExceptionally);
+ }
+
+ private void completeExceptionally(TopicPartition key, Throwable t) {
+ partitionLeaderCache.remove(key);
+ futureOrThrow(key).completeExceptionally(t);
+ }
+
+ private KafkaFutureImpl<V> futureOrThrow(TopicPartition key) {
+ // The below typecast is safe because we initialise futures using
only KafkaFutureImpl.
+ KafkaFutureImpl<V> future = (KafkaFutureImpl<V>) futures.get(key);
+ if (future == null) {
+ throw new IllegalArgumentException("Attempt to complete future
for " + key +
+ ", which was not requested");
+ } else {
+ return future;
+ }
+ }
+ }
}
diff --git
a/clients/src/test/java/org/apache/kafka/clients/admin/internals/PartitionLeaderStrategyIntegrationTest.java
b/clients/src/test/java/org/apache/kafka/clients/admin/internals/PartitionLeaderStrategyIntegrationTest.java
new file mode 100644
index 00000000000..4e03ae7d952
--- /dev/null
+++
b/clients/src/test/java/org/apache/kafka/clients/admin/internals/PartitionLeaderStrategyIntegrationTest.java
@@ -0,0 +1,491 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin.internals;
+
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.DisconnectException;
+import org.apache.kafka.common.errors.LeaderNotAvailableException;
+import org.apache.kafka.common.errors.NotLeaderOrFollowerException;
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.errors.UnknownServerException;
+import org.apache.kafka.common.message.ListOffsetsResponseData;
+import org.apache.kafka.common.message.MetadataRequestData;
+import org.apache.kafka.common.message.MetadataResponseData;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.AbstractRequest;
+import org.apache.kafka.common.requests.AbstractResponse;
+import org.apache.kafka.common.requests.ListOffsetsResponse;
+import org.apache.kafka.common.requests.MetadataRequest;
+import org.apache.kafka.common.requests.MetadataResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.test.TestUtils;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.OptionalInt;
+import java.util.Set;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class PartitionLeaderStrategyIntegrationTest {
+ private static final long TIMEOUT_MS = 5000;
+ private static final long RETRY_BACKOFF_MS = 100;
+
+ private static final Node NODE_1 = new Node(1, "host1", 9092);
+ private static final Node NODE_2 = new Node(2, "host2", 9092);
+
+ private final LogContext logContext = new LogContext();
+ private final MockTime time = new MockTime();
+
+ private AdminApiDriver<TopicPartition, Void> buildDriver(
+ PartitionLeaderStrategy.PartitionLeaderFuture<Void> result
+ ) {
+ return new AdminApiDriver<>(
+ new MockApiHandler(),
+ result,
+ time.milliseconds() + TIMEOUT_MS,
+ RETRY_BACKOFF_MS,
+ RETRY_BACKOFF_MS,
+ logContext
+ );
+ }
+
+ @Test
+ public void testCachingRepeatedRequest() {
+ Map<TopicPartition, Integer> partitionLeaderCache = new HashMap<>();
+
+ TopicPartition tp0 = new TopicPartition("T", 0);
+ TopicPartition tp1 = new TopicPartition("T", 1);
+ Set<TopicPartition> requestKeys = Set.of(tp0, tp1);
+
+ // First, the lookup stage needs to obtain leadership data because the
cache is empty
+ PartitionLeaderStrategy.PartitionLeaderFuture<Void> result =
+ new PartitionLeaderStrategy.PartitionLeaderFuture<>(requestKeys,
partitionLeaderCache);
+ AdminApiDriver<TopicPartition, Void> driver = buildDriver(result);
+
+ List<AdminApiDriver.RequestSpec<TopicPartition>> requestSpecs =
driver.poll();
+ assertEquals(1, requestSpecs.size());
+
+ assertEquals(OptionalInt.empty(),
requestSpecs.get(0).scope.destinationBrokerId());
+ assertEquals(requestKeys, requestSpecs.get(0).keys);
+
+ // The cache will be populated using the leader information from this
metadata response
+ Map<TopicPartition, Integer> leaders = Map.of(tp0, 1, tp1, 2);
+ driver.onResponse(time.milliseconds(), requestSpecs.get(0),
metadataResponseWithPartitionLeaders(leaders), Node.noNode());
+ assertFalse(result.all().get(tp0).isDone());
+ assertFalse(result.all().get(tp1).isDone());
+
+ assertEquals(1, partitionLeaderCache.get(tp0));
+ assertEquals(2, partitionLeaderCache.get(tp1));
+
+ // Second, the fulfillment stage makes the actual requests
+ requestSpecs = driver.poll();
+ assertEquals(2, requestSpecs.size());
+
+ assertEquals(OptionalInt.of(1),
requestSpecs.get(0).scope.destinationBrokerId());
+ assertEquals(OptionalInt.of(2),
requestSpecs.get(1).scope.destinationBrokerId());
+
+ driver.onResponse(time.milliseconds(), requestSpecs.get(0),
listOffsetsResponseSuccess(requestSpecs.get(0).keys), NODE_1);
+ driver.onResponse(time.milliseconds(), requestSpecs.get(1),
listOffsetsResponseSuccess(requestSpecs.get(1).keys), NODE_2);
+ assertTrue(result.all().get(tp0).isDone());
+ assertTrue(result.all().get(tp1).isDone());
+
+ // On the second request, the partition leader cache already contains
all the leadership
+ // data so the request goes straight to the fulfillment stage
+ result = new
PartitionLeaderStrategy.PartitionLeaderFuture<>(requestKeys,
partitionLeaderCache);
+ driver = buildDriver(result);
+
+ requestSpecs = driver.poll();
+ assertEquals(2, requestSpecs.size());
+
+ // We can tell this is the fulfillment stage by the destination broker
id being set
+ assertEquals(OptionalInt.of(1),
requestSpecs.get(0).scope.destinationBrokerId());
+ assertEquals(OptionalInt.of(2),
requestSpecs.get(1).scope.destinationBrokerId());
+
+ driver.onResponse(time.milliseconds(), requestSpecs.get(0),
listOffsetsResponseSuccess(requestSpecs.get(0).keys), NODE_1);
+ driver.onResponse(time.milliseconds(), requestSpecs.get(1),
listOffsetsResponseSuccess(requestSpecs.get(1).keys), NODE_2);
+ assertTrue(result.all().get(tp0).isDone());
+ assertTrue(result.all().get(tp1).isDone());
+ }
+
+ @Test
+ public void testCachingOverlappingRequests() {
+ // This test uses several requests to exercise the caching in various
ways:
+ // 1) for T-0 and T-1 (initially the cache is empty)
+ // 2) for T-1 and T-2 (leadership data for T-1 should be
cached from previous request)
+ // 3) for T-0, T-1 and T-2 (all leadership data should be cached
already)
+ // 4) for T-0, T-1, T-2 and T-3 (just T-3 needs to be looked up)
+ Map<TopicPartition, Integer> partitionLeaderCache = new HashMap<>();
+
+ TopicPartition tp0 = new TopicPartition("T", 0);
+ TopicPartition tp1 = new TopicPartition("T", 1);
+ TopicPartition tp2 = new TopicPartition("T", 2);
+ TopicPartition tp3 = new TopicPartition("T", 3);
+
+ //
+ // Request 1 - T-0 and T-1
+ //
+ Set<TopicPartition> requestKeys = Set.of(tp0, tp1);
+
+ // First, the lookup stage needs to obtain leadership data because the
cache is empty
+ PartitionLeaderStrategy.PartitionLeaderFuture<Void> result =
+ new PartitionLeaderStrategy.PartitionLeaderFuture<>(requestKeys,
partitionLeaderCache);
+ AdminApiDriver<TopicPartition, Void> driver = buildDriver(result);
+
+ List<AdminApiDriver.RequestSpec<TopicPartition>> requestSpecs =
driver.poll();
+ assertEquals(1, requestSpecs.size());
+
+ assertEquals(OptionalInt.empty(),
requestSpecs.get(0).scope.destinationBrokerId());
+ assertEquals(requestKeys, requestSpecs.get(0).keys);
+
+ // The cache will be populated using the leader information from this
metadata response
+ Map<TopicPartition, Integer> leaders = Map.of(tp0, 1, tp1, 2);
+ driver.onResponse(time.milliseconds(), requestSpecs.get(0),
metadataResponseWithPartitionLeaders(leaders), Node.noNode());
+ assertFalse(result.all().get(tp0).isDone());
+ assertFalse(result.all().get(tp1).isDone());
+
+ assertEquals(1, partitionLeaderCache.get(tp0));
+ assertEquals(2, partitionLeaderCache.get(tp1));
+
+ // Second, the fulfillment stage makes the actual requests
+ requestSpecs = driver.poll();
+ assertEquals(2, requestSpecs.size());
+
+ assertEquals(OptionalInt.of(1),
requestSpecs.get(0).scope.destinationBrokerId());
+ assertEquals(OptionalInt.of(2),
requestSpecs.get(1).scope.destinationBrokerId());
+
+ driver.onResponse(time.milliseconds(), requestSpecs.get(0),
listOffsetsResponseSuccess(requestSpecs.get(0).keys), NODE_1);
+ driver.onResponse(time.milliseconds(), requestSpecs.get(1),
listOffsetsResponseSuccess(requestSpecs.get(1).keys), NODE_2);
+ assertTrue(result.all().get(tp0).isDone());
+ assertTrue(result.all().get(tp1).isDone());
+
+ //
+ // Request 2 - T-1 and T-2
+ //
+ // On the second request, the partition leader cache already contains
some of the leadership data.
+ // Now the lookup and fulfillment stages overlap.
+ requestKeys = Set.of(tp1, tp2);
+ result = new
PartitionLeaderStrategy.PartitionLeaderFuture<>(requestKeys,
partitionLeaderCache);
+ driver = buildDriver(result);
+
+ requestSpecs = driver.poll();
+ assertEquals(2, requestSpecs.size());
+
+ assertEquals(OptionalInt.empty(),
requestSpecs.get(0).scope.destinationBrokerId());
+ assertEquals(Collections.singleton(tp2), requestSpecs.get(0).keys);
+ assertEquals(OptionalInt.of(2),
requestSpecs.get(1).scope.destinationBrokerId());
+
+ // The cache will be populated using the leader information from this
metadata response
+ leaders = Map.of(tp2, 1);
+ driver.onResponse(time.milliseconds(), requestSpecs.get(0),
metadataResponseWithPartitionLeaders(leaders), Node.noNode());
+ driver.onResponse(time.milliseconds(), requestSpecs.get(1),
listOffsetsResponseSuccess(requestSpecs.get(1).keys), NODE_2);
+ assertTrue(result.all().get(tp1).isDone()); // Already fulfilled
+ assertFalse(result.all().get(tp2).isDone());
+
+ assertEquals(1, partitionLeaderCache.get(tp0));
+ assertEquals(2, partitionLeaderCache.get(tp1));
+ assertEquals(1, partitionLeaderCache.get(tp2));
+
+ // Finally, the fulfillment stage makes the actual request for the
uncached topic-partition
+ requestSpecs = driver.poll();
+ assertEquals(1, requestSpecs.size());
+
+ assertEquals(OptionalInt.of(1),
requestSpecs.get(0).scope.destinationBrokerId());
+
+ driver.onResponse(time.milliseconds(), requestSpecs.get(0),
listOffsetsResponseSuccess(requestSpecs.get(0).keys), NODE_1);
+ assertTrue(result.all().get(tp1).isDone());
+ assertTrue(result.all().get(tp2).isDone());
+
+ //
+ // Request 3 - T-0, T-1 and T-2
+ //
+ // On the third request, the partition leader cache contains all the
leadership data
+ requestKeys = Set.of(tp0, tp1, tp2);
+ result = new
PartitionLeaderStrategy.PartitionLeaderFuture<>(requestKeys,
partitionLeaderCache);
+ driver = buildDriver(result);
+
+ requestSpecs = driver.poll();
+ assertEquals(2, requestSpecs.size());
+
+ // We can tell this is the fulfillment stage by the destination broker
id being set
+ assertEquals(OptionalInt.of(1),
requestSpecs.get(0).scope.destinationBrokerId());
+ assertEquals(OptionalInt.of(2),
requestSpecs.get(1).scope.destinationBrokerId());
+
+ driver.onResponse(time.milliseconds(), requestSpecs.get(0),
listOffsetsResponseSuccess(requestSpecs.get(0).keys), NODE_1);
+ driver.onResponse(time.milliseconds(), requestSpecs.get(1),
listOffsetsResponseSuccess(requestSpecs.get(1).keys), NODE_2);
+ assertTrue(result.all().get(tp0).isDone());
+ assertTrue(result.all().get(tp1).isDone());
+ assertTrue(result.all().get(tp2).isDone());
+
+ //
+ // Request 4 - T-0, T-1, T-2 and T-3
+ //
+ // On the fourth request, the partition leader cache already contains
some of the leadership data.
+ // Now the lookup and fulfillment stages overlap.
+ requestKeys = Set.of(tp0, tp1, tp2, tp3);
+ result = new
PartitionLeaderStrategy.PartitionLeaderFuture<>(requestKeys,
partitionLeaderCache);
+ driver = buildDriver(result);
+
+ requestSpecs = driver.poll();
+ assertEquals(3, requestSpecs.size());
+
+ assertEquals(OptionalInt.empty(),
requestSpecs.get(0).scope.destinationBrokerId());
+ assertEquals(Collections.singleton(tp3), requestSpecs.get(0).keys);
+ assertEquals(OptionalInt.of(1),
requestSpecs.get(1).scope.destinationBrokerId());
+ assertEquals(OptionalInt.of(2),
requestSpecs.get(2).scope.destinationBrokerId());
+
+ // The cache will be populated using the leader information from this
metadata response
+ leaders = Map.of(tp3, 2);
+ driver.onResponse(time.milliseconds(), requestSpecs.get(0),
metadataResponseWithPartitionLeaders(leaders), Node.noNode());
+ driver.onResponse(time.milliseconds(), requestSpecs.get(1),
listOffsetsResponseSuccess(requestSpecs.get(1).keys), NODE_1);
+ driver.onResponse(time.milliseconds(), requestSpecs.get(2),
listOffsetsResponseSuccess(requestSpecs.get(2).keys), NODE_2);
+ assertTrue(result.all().get(tp0).isDone()); // Already fulfilled
+ assertTrue(result.all().get(tp1).isDone()); // Already fulfilled
+ assertTrue(result.all().get(tp2).isDone()); // Already fulfilled
+ assertFalse(result.all().get(tp3).isDone());
+
+ assertEquals(1, partitionLeaderCache.get(tp0));
+ assertEquals(2, partitionLeaderCache.get(tp1));
+ assertEquals(1, partitionLeaderCache.get(tp2));
+ assertEquals(2, partitionLeaderCache.get(tp3));
+
+ // Finally, the fulfillment stage makes the actual request for the
uncached topic-partition
+ requestSpecs = driver.poll();
+ assertEquals(1, requestSpecs.size());
+
+ assertEquals(OptionalInt.of(2),
requestSpecs.get(0).scope.destinationBrokerId());
+
+ driver.onResponse(time.milliseconds(), requestSpecs.get(0),
listOffsetsResponseSuccess(requestSpecs.get(0).keys), NODE_2);
+ assertTrue(result.all().get(tp0).isDone());
+ assertTrue(result.all().get(tp1).isDone());
+ assertTrue(result.all().get(tp2).isDone());
+ assertTrue(result.all().get(tp3).isDone());
+ }
+
+ @Test
+ public void testNotLeaderFulfillmentError() {
+ Map<TopicPartition, Integer> partitionLeaderCache = new HashMap<>();
+
+ TopicPartition tp0 = new TopicPartition("T", 0);
+ TopicPartition tp1 = new TopicPartition("T", 1);
+ Set<TopicPartition> requestKeys = Set.of(tp0, tp1);
+
+ // First, the lookup stage needs to obtain leadership data because the
cache is empty
+ PartitionLeaderStrategy.PartitionLeaderFuture<Void> result =
+ new PartitionLeaderStrategy.PartitionLeaderFuture<>(requestKeys,
partitionLeaderCache);
+ AdminApiDriver<TopicPartition, Void> driver = buildDriver(result);
+
+ List<AdminApiDriver.RequestSpec<TopicPartition>> requestSpecs =
driver.poll();
+ assertEquals(1, requestSpecs.size());
+
+ assertEquals(OptionalInt.empty(),
requestSpecs.get(0).scope.destinationBrokerId());
+ assertEquals(requestKeys, requestSpecs.get(0).keys);
+
+ // The cache will be populated using the leader information from this
metadata response
+ Map<TopicPartition, Integer> leaders = Map.of(tp0, 1, tp1, 2);
+ driver.onResponse(time.milliseconds(), requestSpecs.get(0),
metadataResponseWithPartitionLeaders(leaders), Node.noNode());
+ assertFalse(result.all().get(tp0).isDone());
+ assertFalse(result.all().get(tp1).isDone());
+
+ assertEquals(1, partitionLeaderCache.get(tp0));
+ assertEquals(2, partitionLeaderCache.get(tp1));
+
+ // Second, the fulfillment stage makes the actual requests
+ requestSpecs = driver.poll();
+ assertEquals(2, requestSpecs.size());
+
+ assertEquals(OptionalInt.of(1),
requestSpecs.get(0).scope.destinationBrokerId());
+ assertEquals(OptionalInt.of(2),
requestSpecs.get(1).scope.destinationBrokerId());
+
+ driver.onResponse(time.milliseconds(), requestSpecs.get(0),
listOffsetsResponseSuccess(requestSpecs.get(0).keys), NODE_1);
+ driver.onResponse(time.milliseconds(), requestSpecs.get(1),
listOffsetsResponseFailure(requestSpecs.get(1).keys,
Errors.NOT_LEADER_OR_FOLLOWER), NODE_2);
+ assertTrue(result.all().get(tp0).isDone());
+ assertFalse(result.all().get(tp1).isDone());
+
+ // Now the lookup occurs again - change leadership to node 1
+ requestSpecs = driver.poll();
+ assertEquals(1, requestSpecs.size());
+
+ assertEquals(OptionalInt.empty(),
requestSpecs.get(0).scope.destinationBrokerId());
+
+ leaders = Map.of(tp1, 1);
+ driver.onResponse(time.milliseconds(), requestSpecs.get(0),
metadataResponseWithPartitionLeaders(leaders), Node.noNode());
+ assertTrue(result.all().get(tp0).isDone());
+ assertFalse(result.all().get(tp1).isDone());
+
+ assertEquals(1, partitionLeaderCache.get(tp0));
+ assertEquals(1, partitionLeaderCache.get(tp1));
+
+ // And the fulfillment stage makes the actual request
+ requestSpecs = driver.poll();
+ assertEquals(1, requestSpecs.size());
+
+ assertEquals(OptionalInt.of(1),
requestSpecs.get(0).scope.destinationBrokerId());
+
+ driver.onResponse(time.milliseconds(), requestSpecs.get(0),
listOffsetsResponseSuccess(requestSpecs.get(0).keys), NODE_1);
+ assertTrue(result.all().get(tp0).isDone());
+ assertTrue(result.all().get(tp1).isDone());
+ }
+
+ @Test
+ public void testFatalLookupError() {
+ TopicPartition tp0 = new TopicPartition("T", 0);
+ Map<TopicPartition, Integer> partitionLeaderCache = new HashMap<>();
+ PartitionLeaderStrategy.PartitionLeaderFuture<Void> result =
+ new
PartitionLeaderStrategy.PartitionLeaderFuture<>(Collections.singleton(tp0),
partitionLeaderCache);
+ AdminApiDriver<TopicPartition, Void> driver = buildDriver(result);
+
+ List<AdminApiDriver.RequestSpec<TopicPartition>> requestSpecs =
driver.poll();
+ assertEquals(1, requestSpecs.size());
+
+ AdminApiDriver.RequestSpec<TopicPartition> spec = requestSpecs.get(0);
+ assertEquals(Collections.singleton(tp0), spec.keys);
+
+ driver.onFailure(time.milliseconds(), spec, new
UnknownServerException());
+ assertTrue(result.all().get(tp0).isDone());
+ TestUtils.assertFutureThrows(result.all().get(tp0),
UnknownServerException.class);
+ assertEquals(Collections.emptyList(), driver.poll());
+ }
+
+ @Test
+ public void testRetryLookupAfterDisconnect() {
+ TopicPartition tp0 = new TopicPartition("T", 0);
+ Map<TopicPartition, Integer> partitionLeaderCache = new HashMap<>();
+ PartitionLeaderStrategy.PartitionLeaderFuture<Void> result =
+ new
PartitionLeaderStrategy.PartitionLeaderFuture<>(Collections.singleton(tp0),
partitionLeaderCache);
+ AdminApiDriver<TopicPartition, Void> driver = buildDriver(result);
+
+ List<AdminApiDriver.RequestSpec<TopicPartition>> requestSpecs =
driver.poll();
+ assertEquals(1, requestSpecs.size());
+
+ AdminApiDriver.RequestSpec<TopicPartition> spec = requestSpecs.get(0);
+ assertEquals(Collections.singleton(tp0), spec.keys);
+
+ driver.onFailure(time.milliseconds(), spec, new DisconnectException());
+ List<AdminApiDriver.RequestSpec<TopicPartition>> retrySpecs =
driver.poll();
+ assertEquals(1, retrySpecs.size());
+
+ AdminApiDriver.RequestSpec<TopicPartition> retrySpec =
retrySpecs.get(0);
+ assertEquals(Collections.singleton(tp0), retrySpec.keys);
+ assertEquals(time.milliseconds(), retrySpec.nextAllowedTryMs);
+ assertEquals(Collections.emptyList(), driver.poll());
+ }
+
+ private MetadataResponse
metadataResponseWithPartitionLeaders(Map<TopicPartition, Integer> mapping) {
+ MetadataResponseData response = new MetadataResponseData();
+ mapping.forEach((tp, brokerId) -> response.topics().add(new
MetadataResponseData.MetadataResponseTopic()
+ .setName(tp.topic())
+ .setPartitions(Collections.singletonList(new
MetadataResponseData.MetadataResponsePartition()
+ .setPartitionIndex(tp.partition())
+ .setLeaderId(brokerId)))));
+ return new MetadataResponse(response,
ApiKeys.METADATA.latestVersion());
+ }
+
+ private ListOffsetsResponse listOffsetsResponseSuccess(Set<TopicPartition>
keys) {
+ // This structure is not quite how Kafka does it, but it works for the
MockApiHandler
+ ListOffsetsResponseData response = new ListOffsetsResponseData();
+ keys.forEach(tp -> {
+ ListOffsetsResponseData.ListOffsetsPartitionResponse partResponse =
+ new ListOffsetsResponseData.ListOffsetsPartitionResponse()
+ .setPartitionIndex(tp.partition());
+ ListOffsetsResponseData.ListOffsetsTopicResponse topicResponse =
+ new ListOffsetsResponseData.ListOffsetsTopicResponse()
+ .setName(tp.topic())
+ .setPartitions(Collections.singletonList(partResponse));
+ response.topics().add(topicResponse);
+ });
+ return new ListOffsetsResponse(response);
+ }
+
+ private ListOffsetsResponse listOffsetsResponseFailure(Set<TopicPartition>
keys, Errors error) {
+ // This structure is not quite how Kafka does it, but it works for the
MockApiHandler
+ ListOffsetsResponseData response = new ListOffsetsResponseData();
+ keys.forEach(tp -> {
+ ListOffsetsResponseData.ListOffsetsPartitionResponse partResponse =
+ new ListOffsetsResponseData.ListOffsetsPartitionResponse()
+ .setPartitionIndex(tp.partition())
+ .setErrorCode(error.code());
+ ListOffsetsResponseData.ListOffsetsTopicResponse topicResponse =
+ new ListOffsetsResponseData.ListOffsetsTopicResponse()
+ .setName(tp.topic())
+ .setPartitions(Collections.singletonList(partResponse));
+ response.topics().add(topicResponse);
+ });
+ return new ListOffsetsResponse(response);
+ }
+
+ private class MockApiHandler extends
AdminApiHandler.Batched<TopicPartition, Void> {
+ private final PartitionLeaderStrategy partitionLeaderStrategy = new
PartitionLeaderStrategy(logContext);
+
+ @Override
+ public String apiName() {
+ return "mock-api";
+ }
+
+ @Override
+ public AbstractRequest.Builder<?> buildBatchedRequest(
+ int brokerId,
+ Set<TopicPartition> keys
+ ) {
+ return new MetadataRequest.Builder(new MetadataRequestData());
+ }
+
+ @Override
+ public ApiResult<TopicPartition, Void> handleResponse(
+ Node broker,
+ Set<TopicPartition> keys,
+ AbstractResponse abstractResponse
+ ) {
+ ListOffsetsResponse response = (ListOffsetsResponse)
abstractResponse;
+
+ Map<TopicPartition, Void> completed = new HashMap<>();
+ Map<TopicPartition, Throwable> failed = new HashMap<>();
+ List<TopicPartition> unmapped = new ArrayList<>();
+
+ response.topics().forEach(topic ->
topic.partitions().forEach(partition -> {
+ TopicPartition tp = new TopicPartition(topic.name(),
partition.partitionIndex());
+ if (partition.errorCode() != Errors.NONE.code()) {
+ Exception exception =
Errors.forCode(partition.errorCode()).exception();
+ if (exception instanceof NotLeaderOrFollowerException ||
exception instanceof LeaderNotAvailableException) {
+ unmapped.add(tp);
+ } else if (!(exception instanceof RetriableException)) {
+ failed.put(tp,
Errors.forCode(partition.errorCode()).exception());
+ }
+ } else {
+ completed.put(tp, null);
+ }
+ }));
+
+ return new ApiResult<>(completed, failed, unmapped);
+ }
+
+ @Override
+ public PartitionLeaderStrategy lookupStrategy() {
+ return partitionLeaderStrategy;
+ }
+ }
+}