This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new a592912ec94 KAFKA-17663 Add metadata caching in 
PartitionLeaderStrategy (#17367)
a592912ec94 is described below

commit a592912ec94b021a2bd7485abb9ebda02dc01766
Author: Andrew Schofield <[email protected]>
AuthorDate: Mon Nov 18 06:45:06 2024 +0000

    KAFKA-17663 Add metadata caching in PartitionLeaderStrategy (#17367)
    
    Admin API operations have two phases: lookup and fulfilment. The lookup 
phase involves a METADATA request whose details depend upon the operation being 
performed.
    
    For some operations, the METADATA request can be quite expensive to serve. 
For example, if the user calls Admin.listOffsets for 1000 topics, the METADATA 
request will include all 1000 topics and the response will contain the leader 
information for all of these topics. And then the actual fulfilment phase does 
the real work of the operation.
    
    In cases where a long-running application is performing repeated admin 
operations which need the same metadata information about partition leadership, 
it is not necessary to send the METADATA request for every single admin 
operation.
    
    This PR adds a cache of the mapping from topic-partition to leader id to 
the admin client. The cache doesn't need to be very sophisticated because the 
admin client will retry if the information becomes stale, and the cache can be 
updated as a result of the retry.
    
    Reviewers: Chia-Ping Tsai <[email protected]>
---
 .../kafka/clients/admin/KafkaAdminClient.java      |  18 +-
 .../admin/internals/AbortTransactionHandler.java   |   8 +-
 .../clients/admin/internals/AdminApiDriver.java    |   8 +-
 .../clients/admin/internals/AdminApiFuture.java    |  20 +
 .../admin/internals/DeleteRecordsHandler.java      |   8 +-
 .../admin/internals/DescribeProducersHandler.java  |   7 +-
 .../admin/internals/ListOffsetsHandler.java        |   8 +-
 .../admin/internals/PartitionLeaderStrategy.java   |  92 ++++
 .../PartitionLeaderStrategyIntegrationTest.java    | 491 +++++++++++++++++++++
 9 files changed, 638 insertions(+), 22 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java 
b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index 7e8f42091fc..1b6d3efc8e1 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -58,6 +58,7 @@ import 
org.apache.kafka.clients.admin.internals.FenceProducersHandler;
 import 
org.apache.kafka.clients.admin.internals.ListConsumerGroupOffsetsHandler;
 import org.apache.kafka.clients.admin.internals.ListOffsetsHandler;
 import org.apache.kafka.clients.admin.internals.ListTransactionsHandler;
+import org.apache.kafka.clients.admin.internals.PartitionLeaderStrategy;
 import 
org.apache.kafka.clients.admin.internals.RemoveMembersFromConsumerGroupHandler;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
@@ -408,6 +409,7 @@ public class KafkaAdminClient extends AdminClient {
     private final ExponentialBackoff retryBackoff;
     private final long rebootstrapTriggerMs;
     private final MetadataRecoveryStrategy metadataRecoveryStrategy;
+    private final Map<TopicPartition, Integer> partitionLeaderCache;
     private final AdminFetchMetricsManager adminFetchMetricsManager;
     private final Optional<ClientTelemetryReporter> clientTelemetryReporter;
 
@@ -635,6 +637,7 @@ public class KafkaAdminClient extends AdminClient {
         this.clientTelemetryReporter.ifPresent(reporters::add);
         this.rebootstrapTriggerMs = 
config.getLong(AdminClientConfig.METADATA_RECOVERY_REBOOTSTRAP_TRIGGER_MS_CONFIG);
         this.metadataRecoveryStrategy = 
MetadataRecoveryStrategy.forName(config.getString(AdminClientConfig.METADATA_RECOVERY_STRATEGY_CONFIG));
+        this.partitionLeaderCache = new HashMap<>();
         this.adminFetchMetricsManager = new AdminFetchMetricsManager(metrics);
         config.logUnused();
         AppInfoParser.registerAppInfo(JMX_PREFIX, clientId, metrics, 
time.milliseconds());
@@ -3359,7 +3362,8 @@ public class KafkaAdminClient extends AdminClient {
     @Override
     public DeleteRecordsResult deleteRecords(final Map<TopicPartition, 
RecordsToDelete> recordsToDelete,
                                              final DeleteRecordsOptions 
options) {
-        SimpleAdminApiFuture<TopicPartition, DeletedRecords> future = 
DeleteRecordsHandler.newFuture(recordsToDelete.keySet());
+        PartitionLeaderStrategy.PartitionLeaderFuture<DeletedRecords> future =
+            DeleteRecordsHandler.newFuture(recordsToDelete.keySet(), 
partitionLeaderCache);
         int timeoutMs = defaultApiTimeoutMs;
         if (options.timeoutMs() != null) {
             timeoutMs = options.timeoutMs();
@@ -4372,8 +4376,8 @@ public class KafkaAdminClient extends AdminClient {
     @Override
     public ListOffsetsResult listOffsets(Map<TopicPartition, OffsetSpec> 
topicPartitionOffsets,
                                          ListOffsetsOptions options) {
-        AdminApiFuture.SimpleAdminApiFuture<TopicPartition, 
ListOffsetsResultInfo> future =
-            ListOffsetsHandler.newFuture(topicPartitionOffsets.keySet());
+        PartitionLeaderStrategy.PartitionLeaderFuture<ListOffsetsResultInfo> 
future =
+            ListOffsetsHandler.newFuture(topicPartitionOffsets.keySet(), 
partitionLeaderCache);
         Map<TopicPartition, Long> offsetQueriesByPartition = 
topicPartitionOffsets.entrySet().stream()
             .collect(Collectors.toMap(Map.Entry::getKey, e -> 
getOffsetFromSpec(e.getValue())));
         ListOffsetsHandler handler = new 
ListOffsetsHandler(offsetQueriesByPartition, options, logContext, 
defaultApiTimeoutMs);
@@ -4918,8 +4922,8 @@ public class KafkaAdminClient extends AdminClient {
 
     @Override
     public DescribeProducersResult 
describeProducers(Collection<TopicPartition> topicPartitions, 
DescribeProducersOptions options) {
-        AdminApiFuture.SimpleAdminApiFuture<TopicPartition, 
DescribeProducersResult.PartitionProducerState> future =
-            DescribeProducersHandler.newFuture(topicPartitions);
+        
PartitionLeaderStrategy.PartitionLeaderFuture<DescribeProducersResult.PartitionProducerState>
 future =
+            DescribeProducersHandler.newFuture(topicPartitions, 
partitionLeaderCache);
         DescribeProducersHandler handler = new 
DescribeProducersHandler(options, logContext);
         invokeDriver(handler, future, options.timeoutMs);
         return new DescribeProducersResult(future.all());
@@ -4936,8 +4940,8 @@ public class KafkaAdminClient extends AdminClient {
 
     @Override
     public AbortTransactionResult abortTransaction(AbortTransactionSpec spec, 
AbortTransactionOptions options) {
-        AdminApiFuture.SimpleAdminApiFuture<TopicPartition, Void> future =
-            
AbortTransactionHandler.newFuture(Collections.singleton(spec.topicPartition()));
+        PartitionLeaderStrategy.PartitionLeaderFuture<Void> future =
+            
AbortTransactionHandler.newFuture(Collections.singleton(spec.topicPartition()), 
partitionLeaderCache);
         AbortTransactionHandler handler = new AbortTransactionHandler(spec, 
logContext);
         invokeDriver(handler, future, options.timeoutMs);
         return new AbortTransactionResult(future.all());
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/internals/AbortTransactionHandler.java
 
b/clients/src/main/java/org/apache/kafka/clients/admin/internals/AbortTransactionHandler.java
index 0f5f4781080..f0b6d28be6b 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/admin/internals/AbortTransactionHandler.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/admin/internals/AbortTransactionHandler.java
@@ -34,6 +34,7 @@ import org.apache.kafka.common.utils.LogContext;
 import org.slf4j.Logger;
 
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 
 import static java.util.Collections.singleton;
@@ -53,10 +54,11 @@ public class AbortTransactionHandler extends 
AdminApiHandler.Batched<TopicPartit
         this.lookupStrategy = new PartitionLeaderStrategy(logContext);
     }
 
-    public static AdminApiFuture.SimpleAdminApiFuture<TopicPartition, Void> 
newFuture(
-        Set<TopicPartition> topicPartitions
+    public static PartitionLeaderStrategy.PartitionLeaderFuture<Void> 
newFuture(
+        Set<TopicPartition> topicPartitions,
+        Map<TopicPartition, Integer> partitionLeaderCache
     ) {
-        return AdminApiFuture.forKeys(topicPartitions);
+        return new 
PartitionLeaderStrategy.PartitionLeaderFuture<>(topicPartitions, 
partitionLeaderCache);
     }
 
     @Override
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminApiDriver.java
 
b/clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminApiDriver.java
index 92e724e74f5..19e3f13ebef 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminApiDriver.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminApiDriver.java
@@ -110,7 +110,13 @@ public class AdminApiDriver<K, V> {
             retryBackoffMaxMs,
             CommonClientConfigs.RETRY_BACKOFF_JITTER);
         this.log = logContext.logger(AdminApiDriver.class);
-        retryLookup(future.lookupKeys());
+
+        // For any lookup keys for which we do not have cached information, we 
will need to look up
+        // metadata. For all cached keys, they can proceed straight to the 
fulfillment map.
+        // Note that the cache is only used on the initial calls, and any 
errors that result
+        // in additional lookups use the full set of lookup keys.
+        retryLookup(future.uncachedLookupKeys());
+        future.cachedKeyBrokerIdMapping().forEach((key, brokerId) -> 
fulfillmentMap.put(new FulfillmentScope(brokerId), key));
     }
 
     /**
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminApiFuture.java
 
b/clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminApiFuture.java
index b0294d86166..322d116a3df 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminApiFuture.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminApiFuture.java
@@ -19,6 +19,7 @@ package org.apache.kafka.clients.admin.internals;
 import org.apache.kafka.common.KafkaFuture;
 import org.apache.kafka.common.internals.KafkaFutureImpl;
 
+import java.util.Collections;
 import java.util.Map;
 import java.util.Set;
 import java.util.function.Function;
@@ -37,6 +38,25 @@ public interface AdminApiFuture<K, V> {
      */
     Set<K> lookupKeys();
 
+    /**
+     * The set of request keys that do not have cached key-broker id mappings. 
If there
+     * is no cached key mapping, this will be the same as the lookup keys.
+     * Can be empty, but only if the cached key mapping is not empty.
+     */
+    default Set<K> uncachedLookupKeys() {
+        return lookupKeys();
+    }
+
+    /**
+     * The cached key-broker id mapping. For lookup strategies that do not 
make use of a
+     * cache of metadata, this will be empty.
+     *
+     * @return mapping of keys to broker ids
+     */
+    default Map<K, Integer> cachedKeyBrokerIdMapping() {
+        return Collections.emptyMap();
+    }
+
     /**
      * Complete the futures associated with the given keys.
      *
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/internals/DeleteRecordsHandler.java
 
b/clients/src/main/java/org/apache/kafka/clients/admin/internals/DeleteRecordsHandler.java
index 836f2bc2a59..4afef617cb2 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/admin/internals/DeleteRecordsHandler.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/admin/internals/DeleteRecordsHandler.java
@@ -18,7 +18,6 @@ package org.apache.kafka.clients.admin.internals;
 
 import org.apache.kafka.clients.admin.DeletedRecords;
 import org.apache.kafka.clients.admin.RecordsToDelete;
-import 
org.apache.kafka.clients.admin.internals.AdminApiFuture.SimpleAdminApiFuture;
 import org.apache.kafka.clients.admin.internals.AdminApiHandler.Batched;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.TopicPartition;
@@ -72,10 +71,11 @@ public final class DeleteRecordsHandler extends 
Batched<TopicPartition, DeletedR
         return this.lookupStrategy;
     }
 
-    public static SimpleAdminApiFuture<TopicPartition, DeletedRecords> 
newFuture(
-            Collection<TopicPartition> topicPartitions
+    public static 
PartitionLeaderStrategy.PartitionLeaderFuture<DeletedRecords> newFuture(
+            Collection<TopicPartition> topicPartitions,
+            Map<TopicPartition, Integer> partitionLeaderCache
     ) {
-        return AdminApiFuture.forKeys(new HashSet<>(topicPartitions));
+        return new PartitionLeaderStrategy.PartitionLeaderFuture<>(new 
HashSet<>(topicPartitions), partitionLeaderCache);
     }
 
     @Override
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeProducersHandler.java
 
b/clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeProducersHandler.java
index e4b203545bd..84338feb9e4 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeProducersHandler.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeProducersHandler.java
@@ -66,10 +66,11 @@ public class DescribeProducersHandler extends 
AdminApiHandler.Batched<TopicParti
         }
     }
 
-    public static AdminApiFuture.SimpleAdminApiFuture<TopicPartition, 
PartitionProducerState> newFuture(
-        Collection<TopicPartition> topicPartitions
+    public static 
PartitionLeaderStrategy.PartitionLeaderFuture<PartitionProducerState> newFuture(
+        Collection<TopicPartition> topicPartitions,
+        Map<TopicPartition, Integer> partitionLeaderCache
     ) {
-        return AdminApiFuture.forKeys(new HashSet<>(topicPartitions));
+        return new PartitionLeaderStrategy.PartitionLeaderFuture<>(new 
HashSet<>(topicPartitions), partitionLeaderCache);
     }
 
     @Override
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/internals/ListOffsetsHandler.java
 
b/clients/src/main/java/org/apache/kafka/clients/admin/internals/ListOffsetsHandler.java
index e36330a9924..f7c495d7fd8 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/admin/internals/ListOffsetsHandler.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/admin/internals/ListOffsetsHandler.java
@@ -18,7 +18,6 @@ package org.apache.kafka.clients.admin.internals;
 
 import org.apache.kafka.clients.admin.ListOffsetsOptions;
 import org.apache.kafka.clients.admin.ListOffsetsResult.ListOffsetsResultInfo;
-import 
org.apache.kafka.clients.admin.internals.AdminApiFuture.SimpleAdminApiFuture;
 import org.apache.kafka.clients.admin.internals.AdminApiHandler.Batched;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.TopicPartition;
@@ -217,9 +216,10 @@ public final class ListOffsetsHandler extends 
Batched<TopicPartition, ListOffset
         }
     }
 
-    public static SimpleAdminApiFuture<TopicPartition, ListOffsetsResultInfo> 
newFuture(
-        Collection<TopicPartition> topicPartitions
+    public static 
PartitionLeaderStrategy.PartitionLeaderFuture<ListOffsetsResultInfo> newFuture(
+        Collection<TopicPartition> topicPartitions,
+        Map<TopicPartition, Integer> partitionLeaderCache
     ) {
-        return AdminApiFuture.forKeys(new HashSet<>(topicPartitions));
+        return new PartitionLeaderStrategy.PartitionLeaderFuture<>(new 
HashSet<>(topicPartitions), partitionLeaderCache);
     }
 }
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/internals/PartitionLeaderStrategy.java
 
b/clients/src/main/java/org/apache/kafka/clients/admin/internals/PartitionLeaderStrategy.java
index 9d52327b3c4..ff7dff2db8e 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/admin/internals/PartitionLeaderStrategy.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/admin/internals/PartitionLeaderStrategy.java
@@ -16,9 +16,11 @@
  */
 package org.apache.kafka.clients.admin.internals;
 
+import org.apache.kafka.common.KafkaFuture;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.InvalidTopicException;
 import org.apache.kafka.common.errors.TopicAuthorizationException;
+import org.apache.kafka.common.internals.KafkaFutureImpl;
 import org.apache.kafka.common.message.MetadataRequestData;
 import org.apache.kafka.common.message.MetadataResponseData;
 import org.apache.kafka.common.protocol.Errors;
@@ -31,9 +33,11 @@ import org.slf4j.Logger;
 
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 import java.util.function.Function;
+import java.util.stream.Collectors;
 
 /**
  * Base driver implementation for APIs which target partition leaders.
@@ -195,4 +199,92 @@ public class PartitionLeaderStrategy implements 
AdminApiLookupStrategy<TopicPart
         return new LookupResult<>(failed, mapped);
     }
 
+    /**
+     * This subclass of {@link AdminApiFuture} starts with a pre-fetched map 
for keys to broker ids which can be
+     * used to optimise the request. The map is kept up to date as metadata is 
fetching as this request is processed.
+     * This is useful for situations in which {@link PartitionLeaderStrategy} 
is used
+     * repeatedly, such as a sequence of identical calls to
+     * {@link org.apache.kafka.clients.admin.Admin#listOffsets(Map, 
org.apache.kafka.clients.admin.ListOffsetsOptions)}.
+     */
+    public static class PartitionLeaderFuture<V> implements 
AdminApiFuture<TopicPartition, V> {
+        private final Set<TopicPartition> requestKeys;
+        private final Map<TopicPartition, Integer> partitionLeaderCache;
+        private final Map<TopicPartition, KafkaFuture<V>> futures;
+
+        public PartitionLeaderFuture(Set<TopicPartition> requestKeys, 
Map<TopicPartition, Integer> partitionLeaderCache) {
+            this.requestKeys = requestKeys;
+            this.partitionLeaderCache = partitionLeaderCache;
+            this.futures = 
requestKeys.stream().collect(Collectors.toUnmodifiableMap(
+                Function.identity(),
+                k -> new KafkaFutureImpl<>()
+            ));
+        }
+
+        @Override
+        public Set<TopicPartition> lookupKeys() {
+            return futures.keySet();
+        }
+
+        @Override
+        public Set<TopicPartition> uncachedLookupKeys() {
+            Set<TopicPartition> keys = new HashSet<>();
+            requestKeys.forEach(tp -> {
+                if (!partitionLeaderCache.containsKey(tp)) {
+                    keys.add(tp);
+                }
+            });
+            return keys;
+        }
+
+        @Override
+        public Map<TopicPartition, Integer> cachedKeyBrokerIdMapping() {
+            Map<TopicPartition, Integer> mapping = new HashMap<>();
+            requestKeys.forEach(tp -> {
+                Integer brokerId = partitionLeaderCache.get(tp);
+                if (brokerId != null) {
+                    mapping.put(tp, brokerId);
+                }
+            });
+            return mapping;
+        }
+
+        public Map<TopicPartition, KafkaFuture<V>> all() {
+            return futures;
+        }
+
+        @Override
+        public void complete(Map<TopicPartition, V> values) {
+            values.forEach(this::complete);
+        }
+
+        private void complete(TopicPartition key, V value) {
+            futureOrThrow(key).complete(value);
+        }
+
+        @Override
+        public void completeLookup(Map<TopicPartition, Integer> 
brokerIdMapping) {
+            partitionLeaderCache.putAll(brokerIdMapping);
+        }
+
+        @Override
+        public void completeExceptionally(Map<TopicPartition, Throwable> 
errors) {
+            errors.forEach(this::completeExceptionally);
+        }
+
+        private void completeExceptionally(TopicPartition key, Throwable t) {
+            partitionLeaderCache.remove(key);
+            futureOrThrow(key).completeExceptionally(t);
+        }
+
+        private KafkaFutureImpl<V> futureOrThrow(TopicPartition key) {
+            // The below typecast is safe because we initialise futures using 
only KafkaFutureImpl.
+            KafkaFutureImpl<V> future = (KafkaFutureImpl<V>) futures.get(key);
+            if (future == null) {
+                throw new IllegalArgumentException("Attempt to complete future 
for " + key +
+                    ", which was not requested");
+            } else {
+                return future;
+            }
+        }
+    }
 }
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/admin/internals/PartitionLeaderStrategyIntegrationTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/admin/internals/PartitionLeaderStrategyIntegrationTest.java
new file mode 100644
index 00000000000..4e03ae7d952
--- /dev/null
+++ 
b/clients/src/test/java/org/apache/kafka/clients/admin/internals/PartitionLeaderStrategyIntegrationTest.java
@@ -0,0 +1,491 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin.internals;
+
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.DisconnectException;
+import org.apache.kafka.common.errors.LeaderNotAvailableException;
+import org.apache.kafka.common.errors.NotLeaderOrFollowerException;
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.errors.UnknownServerException;
+import org.apache.kafka.common.message.ListOffsetsResponseData;
+import org.apache.kafka.common.message.MetadataRequestData;
+import org.apache.kafka.common.message.MetadataResponseData;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.AbstractRequest;
+import org.apache.kafka.common.requests.AbstractResponse;
+import org.apache.kafka.common.requests.ListOffsetsResponse;
+import org.apache.kafka.common.requests.MetadataRequest;
+import org.apache.kafka.common.requests.MetadataResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.test.TestUtils;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.OptionalInt;
+import java.util.Set;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class PartitionLeaderStrategyIntegrationTest {
+    private static final long TIMEOUT_MS = 5000;
+    private static final long RETRY_BACKOFF_MS = 100;
+
+    private static final Node NODE_1 = new Node(1, "host1", 9092);
+    private static final Node NODE_2 = new Node(2, "host2", 9092);
+
+    private final LogContext logContext = new LogContext();
+    private final MockTime time = new MockTime();
+
+    private AdminApiDriver<TopicPartition, Void> buildDriver(
+        PartitionLeaderStrategy.PartitionLeaderFuture<Void> result
+    ) {
+        return new AdminApiDriver<>(
+            new MockApiHandler(),
+            result,
+            time.milliseconds() + TIMEOUT_MS,
+            RETRY_BACKOFF_MS,
+            RETRY_BACKOFF_MS,
+            logContext
+        );
+    }
+
+    @Test
+    public void testCachingRepeatedRequest() {
+        Map<TopicPartition, Integer> partitionLeaderCache = new HashMap<>();
+
+        TopicPartition tp0 = new TopicPartition("T", 0);
+        TopicPartition tp1 = new TopicPartition("T", 1);
+        Set<TopicPartition> requestKeys = Set.of(tp0, tp1);
+
+        // First, the lookup stage needs to obtain leadership data because the 
cache is empty
+        PartitionLeaderStrategy.PartitionLeaderFuture<Void> result =
+            new PartitionLeaderStrategy.PartitionLeaderFuture<>(requestKeys, 
partitionLeaderCache);
+        AdminApiDriver<TopicPartition, Void> driver = buildDriver(result);
+
+        List<AdminApiDriver.RequestSpec<TopicPartition>> requestSpecs = 
driver.poll();
+        assertEquals(1, requestSpecs.size());
+
+        assertEquals(OptionalInt.empty(), 
requestSpecs.get(0).scope.destinationBrokerId());
+        assertEquals(requestKeys, requestSpecs.get(0).keys);
+
+        // The cache will be populated using the leader information from this 
metadata response
+        Map<TopicPartition, Integer> leaders = Map.of(tp0, 1, tp1, 2);
+        driver.onResponse(time.milliseconds(), requestSpecs.get(0), 
metadataResponseWithPartitionLeaders(leaders), Node.noNode());
+        assertFalse(result.all().get(tp0).isDone());
+        assertFalse(result.all().get(tp1).isDone());
+
+        assertEquals(1, partitionLeaderCache.get(tp0));
+        assertEquals(2, partitionLeaderCache.get(tp1));
+
+        // Second, the fulfillment stage makes the actual requests
+        requestSpecs = driver.poll();
+        assertEquals(2, requestSpecs.size());
+
+        assertEquals(OptionalInt.of(1), 
requestSpecs.get(0).scope.destinationBrokerId());
+        assertEquals(OptionalInt.of(2), 
requestSpecs.get(1).scope.destinationBrokerId());
+
+        driver.onResponse(time.milliseconds(), requestSpecs.get(0), 
listOffsetsResponseSuccess(requestSpecs.get(0).keys), NODE_1);
+        driver.onResponse(time.milliseconds(), requestSpecs.get(1), 
listOffsetsResponseSuccess(requestSpecs.get(1).keys), NODE_2);
+        assertTrue(result.all().get(tp0).isDone());
+        assertTrue(result.all().get(tp1).isDone());
+
+        // On the second request, the partition leader cache already contains 
all the leadership
+        // data so the request goes straight to the fulfillment stage
+        result = new 
PartitionLeaderStrategy.PartitionLeaderFuture<>(requestKeys, 
partitionLeaderCache);
+        driver = buildDriver(result);
+
+        requestSpecs = driver.poll();
+        assertEquals(2, requestSpecs.size());
+
+        // We can tell this is the fulfillment stage by the destination broker 
id being set
+        assertEquals(OptionalInt.of(1), 
requestSpecs.get(0).scope.destinationBrokerId());
+        assertEquals(OptionalInt.of(2), 
requestSpecs.get(1).scope.destinationBrokerId());
+
+        driver.onResponse(time.milliseconds(), requestSpecs.get(0), 
listOffsetsResponseSuccess(requestSpecs.get(0).keys), NODE_1);
+        driver.onResponse(time.milliseconds(), requestSpecs.get(1), 
listOffsetsResponseSuccess(requestSpecs.get(1).keys), NODE_2);
+        assertTrue(result.all().get(tp0).isDone());
+        assertTrue(result.all().get(tp1).isDone());
+    }
+
+    @Test
+    public void testCachingOverlappingRequests() {
+        // This test uses several requests to exercise the caching in various 
ways:
+        // 1) for T-0 and T-1            (initially the cache is empty)
+        // 2) for T-1 and T-2            (leadership data for T-1 should be 
cached from previous request)
+        // 3) for T-0, T-1 and T-2       (all leadership data should be cached 
already)
+        // 4) for T-0, T-1, T-2 and T-3  (just T-3 needs to be looked up)
+        Map<TopicPartition, Integer> partitionLeaderCache = new HashMap<>();
+
+        TopicPartition tp0 = new TopicPartition("T", 0);
+        TopicPartition tp1 = new TopicPartition("T", 1);
+        TopicPartition tp2 = new TopicPartition("T", 2);
+        TopicPartition tp3 = new TopicPartition("T", 3);
+
+        //
+        // Request 1 - T-0 and T-1
+        //
+        Set<TopicPartition> requestKeys = Set.of(tp0, tp1);
+
+        // First, the lookup stage needs to obtain leadership data because the 
cache is empty
+        PartitionLeaderStrategy.PartitionLeaderFuture<Void> result =
+            new PartitionLeaderStrategy.PartitionLeaderFuture<>(requestKeys, 
partitionLeaderCache);
+        AdminApiDriver<TopicPartition, Void> driver = buildDriver(result);
+
+        List<AdminApiDriver.RequestSpec<TopicPartition>> requestSpecs = 
driver.poll();
+        assertEquals(1, requestSpecs.size());
+
+        assertEquals(OptionalInt.empty(), 
requestSpecs.get(0).scope.destinationBrokerId());
+        assertEquals(requestKeys, requestSpecs.get(0).keys);
+
+        // The cache will be populated using the leader information from this 
metadata response
+        Map<TopicPartition, Integer> leaders = Map.of(tp0, 1, tp1, 2);
+        driver.onResponse(time.milliseconds(), requestSpecs.get(0), 
metadataResponseWithPartitionLeaders(leaders), Node.noNode());
+        assertFalse(result.all().get(tp0).isDone());
+        assertFalse(result.all().get(tp1).isDone());
+
+        assertEquals(1, partitionLeaderCache.get(tp0));
+        assertEquals(2, partitionLeaderCache.get(tp1));
+
+        // Second, the fulfillment stage makes the actual requests
+        requestSpecs = driver.poll();
+        assertEquals(2, requestSpecs.size());
+
+        assertEquals(OptionalInt.of(1), 
requestSpecs.get(0).scope.destinationBrokerId());
+        assertEquals(OptionalInt.of(2), 
requestSpecs.get(1).scope.destinationBrokerId());
+
+        driver.onResponse(time.milliseconds(), requestSpecs.get(0), 
listOffsetsResponseSuccess(requestSpecs.get(0).keys), NODE_1);
+        driver.onResponse(time.milliseconds(), requestSpecs.get(1), 
listOffsetsResponseSuccess(requestSpecs.get(1).keys), NODE_2);
+        assertTrue(result.all().get(tp0).isDone());
+        assertTrue(result.all().get(tp1).isDone());
+
+        //
+        // Request 2 - T-1 and T-2
+        //
+        // On the second request, the partition leader cache already contains 
some of the leadership data.
+        // Now the lookup and fulfillment stages overlap.
+        requestKeys = Set.of(tp1, tp2);
+        result = new 
PartitionLeaderStrategy.PartitionLeaderFuture<>(requestKeys, 
partitionLeaderCache);
+        driver = buildDriver(result);
+
+        requestSpecs = driver.poll();
+        assertEquals(2, requestSpecs.size());
+
+        assertEquals(OptionalInt.empty(), 
requestSpecs.get(0).scope.destinationBrokerId());
+        assertEquals(Collections.singleton(tp2), requestSpecs.get(0).keys);
+        assertEquals(OptionalInt.of(2), 
requestSpecs.get(1).scope.destinationBrokerId());
+
+        // The cache will be populated using the leader information from this 
metadata response
+        leaders = Map.of(tp2, 1);
+        driver.onResponse(time.milliseconds(), requestSpecs.get(0), 
metadataResponseWithPartitionLeaders(leaders), Node.noNode());
+        driver.onResponse(time.milliseconds(), requestSpecs.get(1), 
listOffsetsResponseSuccess(requestSpecs.get(1).keys), NODE_2);
+        assertTrue(result.all().get(tp1).isDone());  // Already fulfilled
+        assertFalse(result.all().get(tp2).isDone());
+
+        assertEquals(1, partitionLeaderCache.get(tp0));
+        assertEquals(2, partitionLeaderCache.get(tp1));
+        assertEquals(1, partitionLeaderCache.get(tp2));
+
+        // Finally, the fulfillment stage makes the actual request for the 
uncached topic-partition
+        requestSpecs = driver.poll();
+        assertEquals(1, requestSpecs.size());
+
+        assertEquals(OptionalInt.of(1), 
requestSpecs.get(0).scope.destinationBrokerId());
+
+        driver.onResponse(time.milliseconds(), requestSpecs.get(0), 
listOffsetsResponseSuccess(requestSpecs.get(0).keys), NODE_1);
+        assertTrue(result.all().get(tp1).isDone());
+        assertTrue(result.all().get(tp2).isDone());
+
+        //
+        // Request 3 - T-0, T-1 and T-2
+        //
+        // On the third request, the partition leader cache contains all the 
leadership data
+        requestKeys = Set.of(tp0, tp1, tp2);
+        result = new 
PartitionLeaderStrategy.PartitionLeaderFuture<>(requestKeys, 
partitionLeaderCache);
+        driver = buildDriver(result);
+
+        requestSpecs = driver.poll();
+        assertEquals(2, requestSpecs.size());
+
+        // We can tell this is the fulfillment stage by the destination broker 
id being set
+        assertEquals(OptionalInt.of(1), 
requestSpecs.get(0).scope.destinationBrokerId());
+        assertEquals(OptionalInt.of(2), 
requestSpecs.get(1).scope.destinationBrokerId());
+
+        driver.onResponse(time.milliseconds(), requestSpecs.get(0), 
listOffsetsResponseSuccess(requestSpecs.get(0).keys), NODE_1);
+        driver.onResponse(time.milliseconds(), requestSpecs.get(1), 
listOffsetsResponseSuccess(requestSpecs.get(1).keys), NODE_2);
+        assertTrue(result.all().get(tp0).isDone());
+        assertTrue(result.all().get(tp1).isDone());
+        assertTrue(result.all().get(tp2).isDone());
+
+        //
+        // Request 4 - T-0, T-1, T-2 and T-3
+        //
+        // On the fourth request, the partition leader cache already contains 
some of the leadership data.
+        // Now the lookup and fulfillment stages overlap.
+        requestKeys = Set.of(tp0, tp1, tp2, tp3);
+        result = new 
PartitionLeaderStrategy.PartitionLeaderFuture<>(requestKeys, 
partitionLeaderCache);
+        driver = buildDriver(result);
+
+        requestSpecs = driver.poll();
+        assertEquals(3, requestSpecs.size());
+
+        assertEquals(OptionalInt.empty(), 
requestSpecs.get(0).scope.destinationBrokerId());
+        assertEquals(Collections.singleton(tp3), requestSpecs.get(0).keys);
+        assertEquals(OptionalInt.of(1), 
requestSpecs.get(1).scope.destinationBrokerId());
+        assertEquals(OptionalInt.of(2), 
requestSpecs.get(2).scope.destinationBrokerId());
+
+        // The cache will be populated using the leader information from this 
metadata response
+        leaders = Map.of(tp3, 2);
+        driver.onResponse(time.milliseconds(), requestSpecs.get(0), 
metadataResponseWithPartitionLeaders(leaders), Node.noNode());
+        driver.onResponse(time.milliseconds(), requestSpecs.get(1), 
listOffsetsResponseSuccess(requestSpecs.get(1).keys), NODE_1);
+        driver.onResponse(time.milliseconds(), requestSpecs.get(2), 
listOffsetsResponseSuccess(requestSpecs.get(2).keys), NODE_2);
+        assertTrue(result.all().get(tp0).isDone());  // Already fulfilled
+        assertTrue(result.all().get(tp1).isDone());  // Already fulfilled
+        assertTrue(result.all().get(tp2).isDone());  // Already fulfilled
+        assertFalse(result.all().get(tp3).isDone());
+
+        assertEquals(1, partitionLeaderCache.get(tp0));
+        assertEquals(2, partitionLeaderCache.get(tp1));
+        assertEquals(1, partitionLeaderCache.get(tp2));
+        assertEquals(2, partitionLeaderCache.get(tp3));
+
+        // Finally, the fulfillment stage makes the actual request for the 
uncached topic-partition
+        requestSpecs = driver.poll();
+        assertEquals(1, requestSpecs.size());
+
+        assertEquals(OptionalInt.of(2), 
requestSpecs.get(0).scope.destinationBrokerId());
+
+        driver.onResponse(time.milliseconds(), requestSpecs.get(0), 
listOffsetsResponseSuccess(requestSpecs.get(0).keys), NODE_2);
+        assertTrue(result.all().get(tp0).isDone());
+        assertTrue(result.all().get(tp1).isDone());
+        assertTrue(result.all().get(tp2).isDone());
+        assertTrue(result.all().get(tp3).isDone());
+    }
+
+    @Test
+    public void testNotLeaderFulfillmentError() {
+        Map<TopicPartition, Integer> partitionLeaderCache = new HashMap<>();
+
+        TopicPartition tp0 = new TopicPartition("T", 0);
+        TopicPartition tp1 = new TopicPartition("T", 1);
+        Set<TopicPartition> requestKeys = Set.of(tp0, tp1);
+
+        // First, the lookup stage needs to obtain leadership data because the 
cache is empty
+        PartitionLeaderStrategy.PartitionLeaderFuture<Void> result =
+            new PartitionLeaderStrategy.PartitionLeaderFuture<>(requestKeys, 
partitionLeaderCache);
+        AdminApiDriver<TopicPartition, Void> driver = buildDriver(result);
+
+        List<AdminApiDriver.RequestSpec<TopicPartition>> requestSpecs = 
driver.poll();
+        assertEquals(1, requestSpecs.size());
+
+        assertEquals(OptionalInt.empty(), 
requestSpecs.get(0).scope.destinationBrokerId());
+        assertEquals(requestKeys, requestSpecs.get(0).keys);
+
+        // The cache will be populated using the leader information from this 
metadata response
+        Map<TopicPartition, Integer> leaders = Map.of(tp0, 1, tp1, 2);
+        driver.onResponse(time.milliseconds(), requestSpecs.get(0), 
metadataResponseWithPartitionLeaders(leaders), Node.noNode());
+        assertFalse(result.all().get(tp0).isDone());
+        assertFalse(result.all().get(tp1).isDone());
+
+        assertEquals(1, partitionLeaderCache.get(tp0));
+        assertEquals(2, partitionLeaderCache.get(tp1));
+
+        // Second, the fulfillment stage makes the actual requests
+        requestSpecs = driver.poll();
+        assertEquals(2, requestSpecs.size());
+
+        assertEquals(OptionalInt.of(1), 
requestSpecs.get(0).scope.destinationBrokerId());
+        assertEquals(OptionalInt.of(2), 
requestSpecs.get(1).scope.destinationBrokerId());
+
+        driver.onResponse(time.milliseconds(), requestSpecs.get(0), 
listOffsetsResponseSuccess(requestSpecs.get(0).keys), NODE_1);
+        driver.onResponse(time.milliseconds(), requestSpecs.get(1), 
listOffsetsResponseFailure(requestSpecs.get(1).keys, 
Errors.NOT_LEADER_OR_FOLLOWER), NODE_2);
+        assertTrue(result.all().get(tp0).isDone());
+        assertFalse(result.all().get(tp1).isDone());
+
+        // Now the lookup occurs again - change leadership to node 1
+        requestSpecs = driver.poll();
+        assertEquals(1, requestSpecs.size());
+
+        assertEquals(OptionalInt.empty(), 
requestSpecs.get(0).scope.destinationBrokerId());
+
+        leaders = Map.of(tp1, 1);
+        driver.onResponse(time.milliseconds(), requestSpecs.get(0), 
metadataResponseWithPartitionLeaders(leaders), Node.noNode());
+        assertTrue(result.all().get(tp0).isDone());
+        assertFalse(result.all().get(tp1).isDone());
+
+        assertEquals(1, partitionLeaderCache.get(tp0));
+        assertEquals(1, partitionLeaderCache.get(tp1));
+
+        // And the fulfillment stage makes the actual request
+        requestSpecs = driver.poll();
+        assertEquals(1, requestSpecs.size());
+
+        assertEquals(OptionalInt.of(1), 
requestSpecs.get(0).scope.destinationBrokerId());
+
+        driver.onResponse(time.milliseconds(), requestSpecs.get(0), 
listOffsetsResponseSuccess(requestSpecs.get(0).keys), NODE_1);
+        assertTrue(result.all().get(tp0).isDone());
+        assertTrue(result.all().get(tp1).isDone());
+    }
+
+    @Test
+    public void testFatalLookupError() {
+        TopicPartition tp0 = new TopicPartition("T", 0);
+        Map<TopicPartition, Integer> partitionLeaderCache = new HashMap<>();
+        PartitionLeaderStrategy.PartitionLeaderFuture<Void> result =
+            new 
PartitionLeaderStrategy.PartitionLeaderFuture<>(Collections.singleton(tp0), 
partitionLeaderCache);
+        AdminApiDriver<TopicPartition, Void> driver = buildDriver(result);
+
+        List<AdminApiDriver.RequestSpec<TopicPartition>> requestSpecs = 
driver.poll();
+        assertEquals(1, requestSpecs.size());
+
+        AdminApiDriver.RequestSpec<TopicPartition> spec = requestSpecs.get(0);
+        assertEquals(Collections.singleton(tp0), spec.keys);
+
+        driver.onFailure(time.milliseconds(), spec, new 
UnknownServerException());
+        assertTrue(result.all().get(tp0).isDone());
+        TestUtils.assertFutureThrows(result.all().get(tp0), 
UnknownServerException.class);
+        assertEquals(Collections.emptyList(), driver.poll());
+    }
+
+    @Test
+    public void testRetryLookupAfterDisconnect() {
+        TopicPartition tp0 = new TopicPartition("T", 0);
+        Map<TopicPartition, Integer> partitionLeaderCache = new HashMap<>();
+        PartitionLeaderStrategy.PartitionLeaderFuture<Void> result =
+            new 
PartitionLeaderStrategy.PartitionLeaderFuture<>(Collections.singleton(tp0), 
partitionLeaderCache);
+        AdminApiDriver<TopicPartition, Void> driver = buildDriver(result);
+
+        List<AdminApiDriver.RequestSpec<TopicPartition>> requestSpecs = 
driver.poll();
+        assertEquals(1, requestSpecs.size());
+
+        AdminApiDriver.RequestSpec<TopicPartition> spec = requestSpecs.get(0);
+        assertEquals(Collections.singleton(tp0), spec.keys);
+
+        driver.onFailure(time.milliseconds(), spec, new DisconnectException());
+        List<AdminApiDriver.RequestSpec<TopicPartition>> retrySpecs = 
driver.poll();
+        assertEquals(1, retrySpecs.size());
+
+        AdminApiDriver.RequestSpec<TopicPartition> retrySpec = 
retrySpecs.get(0);
+        assertEquals(Collections.singleton(tp0), retrySpec.keys);
+        assertEquals(time.milliseconds(), retrySpec.nextAllowedTryMs);
+        assertEquals(Collections.emptyList(), driver.poll());
+    }
+
+    private MetadataResponse 
metadataResponseWithPartitionLeaders(Map<TopicPartition, Integer> mapping) {
+        MetadataResponseData response = new MetadataResponseData();
+        mapping.forEach((tp, brokerId) -> response.topics().add(new 
MetadataResponseData.MetadataResponseTopic()
+            .setName(tp.topic())
+            .setPartitions(Collections.singletonList(new 
MetadataResponseData.MetadataResponsePartition()
+                .setPartitionIndex(tp.partition())
+                .setLeaderId(brokerId)))));
+        return new MetadataResponse(response, 
ApiKeys.METADATA.latestVersion());
+    }
+
+    private ListOffsetsResponse listOffsetsResponseSuccess(Set<TopicPartition> 
keys) {
+        // This structure is not quite how Kafka does it, but it works for the 
MockApiHandler
+        ListOffsetsResponseData response = new ListOffsetsResponseData();
+        keys.forEach(tp -> {
+            ListOffsetsResponseData.ListOffsetsPartitionResponse partResponse =
+                new ListOffsetsResponseData.ListOffsetsPartitionResponse()
+                    .setPartitionIndex(tp.partition());
+            ListOffsetsResponseData.ListOffsetsTopicResponse topicResponse =
+                new ListOffsetsResponseData.ListOffsetsTopicResponse()
+                    .setName(tp.topic())
+                    .setPartitions(Collections.singletonList(partResponse));
+            response.topics().add(topicResponse);
+        });
+        return new ListOffsetsResponse(response);
+    }
+
+    private ListOffsetsResponse listOffsetsResponseFailure(Set<TopicPartition> 
keys, Errors error) {
+        // This structure is not quite how Kafka does it, but it works for the 
MockApiHandler
+        ListOffsetsResponseData response = new ListOffsetsResponseData();
+        keys.forEach(tp -> {
+            ListOffsetsResponseData.ListOffsetsPartitionResponse partResponse =
+                new ListOffsetsResponseData.ListOffsetsPartitionResponse()
+                    .setPartitionIndex(tp.partition())
+                    .setErrorCode(error.code());
+            ListOffsetsResponseData.ListOffsetsTopicResponse topicResponse =
+                new ListOffsetsResponseData.ListOffsetsTopicResponse()
+                    .setName(tp.topic())
+                    .setPartitions(Collections.singletonList(partResponse));
+            response.topics().add(topicResponse);
+        });
+        return new ListOffsetsResponse(response);
+    }
+
+    private class MockApiHandler extends 
AdminApiHandler.Batched<TopicPartition, Void> {
+        private final PartitionLeaderStrategy partitionLeaderStrategy = new 
PartitionLeaderStrategy(logContext);
+
+        @Override
+        public String apiName() {
+            return "mock-api";
+        }
+
+        @Override
+        public AbstractRequest.Builder<?> buildBatchedRequest(
+            int brokerId,
+            Set<TopicPartition> keys
+        ) {
+            return new MetadataRequest.Builder(new MetadataRequestData());
+        }
+
+        @Override
+        public ApiResult<TopicPartition, Void> handleResponse(
+            Node broker,
+            Set<TopicPartition> keys,
+            AbstractResponse abstractResponse
+        ) {
+            ListOffsetsResponse response = (ListOffsetsResponse) 
abstractResponse;
+
+            Map<TopicPartition, Void> completed = new HashMap<>();
+            Map<TopicPartition, Throwable> failed = new HashMap<>();
+            List<TopicPartition> unmapped = new ArrayList<>();
+
+            response.topics().forEach(topic -> 
topic.partitions().forEach(partition -> {
+                TopicPartition tp = new TopicPartition(topic.name(), 
partition.partitionIndex());
+                if (partition.errorCode() != Errors.NONE.code()) {
+                    Exception exception = 
Errors.forCode(partition.errorCode()).exception();
+                    if (exception instanceof NotLeaderOrFollowerException || 
exception instanceof LeaderNotAvailableException) {
+                        unmapped.add(tp);
+                    } else if (!(exception instanceof RetriableException)) {
+                        failed.put(tp, 
Errors.forCode(partition.errorCode()).exception());
+                    }
+                } else {
+                    completed.put(tp, null);
+                }
+            }));
+
+            return new ApiResult<>(completed, failed, unmapped);
+        }
+
+        @Override
+        public PartitionLeaderStrategy lookupStrategy() {
+            return partitionLeaderStrategy;
+        }
+    }
+}


Reply via email to