This is an automated email from the ASF dual-hosted git repository.

showuon pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new b2d647904c0 KAFKA-8982: Add retry of fetching metadata to 
Admin.deleteRecords (#13760)
b2d647904c0 is described below

commit b2d647904c0d9e02509bc303ae42d880a90d4742
Author: Gantigmaa Selenge <[email protected]>
AuthorDate: Mon Jul 3 02:13:55 2023 +0100

    KAFKA-8982: Add retry of fetching metadata to Admin.deleteRecords (#13760)
    
    Use AdminApiDriver class to refresh the metadata and retry the request that 
failed with retriable errors.
    
    Reviewers: Luke Chen <[email protected]>, Divij Vaidya <[email protected]>, 
Mickael Maison <[email protected]>, Dimitar Dimitrov 
<[email protected]>
---
 .../kafka/clients/admin/KafkaAdminClient.java      | 131 +----------
 .../admin/internals/DeleteRecordsHandler.java      | 178 +++++++++++++++
 .../kafka/clients/admin/KafkaAdminClientTest.java  |  64 ++----
 .../admin/internals/DeleteRecordsHandlerTest.java  | 249 +++++++++++++++++++++
 .../kafka/api/PlaintextAdminIntegrationTest.scala  |   8 +-
 5 files changed, 452 insertions(+), 178 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java 
b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index f2294df3a3a..140021a4c72 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -44,6 +44,7 @@ import 
org.apache.kafka.clients.admin.internals.AlterConsumerGroupOffsetsHandler
 import org.apache.kafka.clients.admin.internals.CoordinatorKey;
 import 
org.apache.kafka.clients.admin.internals.DeleteConsumerGroupOffsetsHandler;
 import org.apache.kafka.clients.admin.internals.DeleteConsumerGroupsHandler;
+import org.apache.kafka.clients.admin.internals.DeleteRecordsHandler;
 import org.apache.kafka.clients.admin.internals.DescribeConsumerGroupsHandler;
 import org.apache.kafka.clients.admin.internals.DescribeProducersHandler;
 import org.apache.kafka.clients.admin.internals.DescribeTransactionsHandler;
@@ -121,11 +122,6 @@ import 
org.apache.kafka.common.message.DeleteAclsRequestData.DeleteAclsFilter;
 import org.apache.kafka.common.message.DeleteAclsResponseData;
 import 
org.apache.kafka.common.message.DeleteAclsResponseData.DeleteAclsFilterResult;
 import 
org.apache.kafka.common.message.DeleteAclsResponseData.DeleteAclsMatchingAcl;
-import org.apache.kafka.common.message.DeleteRecordsRequestData;
-import 
org.apache.kafka.common.message.DeleteRecordsRequestData.DeleteRecordsPartition;
-import 
org.apache.kafka.common.message.DeleteRecordsRequestData.DeleteRecordsTopic;
-import org.apache.kafka.common.message.DeleteRecordsResponseData;
-import 
org.apache.kafka.common.message.DeleteRecordsResponseData.DeleteRecordsTopicResult;
 import org.apache.kafka.common.message.DeleteTopicsRequestData;
 import 
org.apache.kafka.common.message.DeleteTopicsRequestData.DeleteTopicState;
 import 
org.apache.kafka.common.message.DeleteTopicsResponseData.DeletableTopicResult;
@@ -186,8 +182,6 @@ import org.apache.kafka.common.requests.CreateTopicsRequest;
 import org.apache.kafka.common.requests.CreateTopicsResponse;
 import org.apache.kafka.common.requests.DeleteAclsRequest;
 import org.apache.kafka.common.requests.DeleteAclsResponse;
-import org.apache.kafka.common.requests.DeleteRecordsRequest;
-import org.apache.kafka.common.requests.DeleteRecordsResponse;
 import org.apache.kafka.common.requests.DeleteTopicsRequest;
 import org.apache.kafka.common.requests.DeleteTopicsResponse;
 import org.apache.kafka.common.requests.DescribeAclsRequest;
@@ -2917,6 +2911,7 @@ public class KafkaAdminClient extends AdminClient {
                 
maybeCompleteQuotaExceededException(options.shouldRetryOnQuotaViolation(),
                     throwable, futures, quotaExceededExceptions, (int) 
(time.milliseconds() - now));
                 // Fail all the other remaining futures
+
                 completeAllExceptionally(futures.values(), throwable);
             }
         };
@@ -2925,123 +2920,15 @@ public class KafkaAdminClient extends AdminClient {
     @Override
     public DeleteRecordsResult deleteRecords(final Map<TopicPartition, 
RecordsToDelete> recordsToDelete,
                                              final DeleteRecordsOptions 
options) {
-
-        // requests need to be sent to partitions leader nodes so ...
-        // ... from the provided map it's needed to create more maps grouping 
topic/partition per leader
-
-        final Map<TopicPartition, KafkaFutureImpl<DeletedRecords>> futures = 
new HashMap<>(recordsToDelete.size());
-        for (TopicPartition topicPartition: recordsToDelete.keySet()) {
-            futures.put(topicPartition, new KafkaFutureImpl<>());
+        SimpleAdminApiFuture<TopicPartition, DeletedRecords> future = 
DeleteRecordsHandler.newFuture(recordsToDelete.keySet());
+        int timeoutMs = defaultApiTimeoutMs;
+        if (options.timeoutMs() != null) {
+            timeoutMs = options.timeoutMs();
         }
+        DeleteRecordsHandler handler = new 
DeleteRecordsHandler(recordsToDelete, logContext, timeoutMs);
+        invokeDriver(handler, future, options.timeoutMs);
 
-        // preparing topics list for asking metadata about them
-        final Set<String> topics = new HashSet<>();
-        for (TopicPartition topicPartition: recordsToDelete.keySet()) {
-            topics.add(topicPartition.topic());
-        }
-
-        final long nowMetadata = time.milliseconds();
-        final long deadline = calcDeadlineMs(nowMetadata, options.timeoutMs());
-        // asking for topics metadata for getting partitions leaders
-        runnable.call(new Call("topicsMetadata", deadline,
-                new LeastLoadedNodeProvider()) {
-
-            @Override
-            MetadataRequest.Builder createRequest(int timeoutMs) {
-                return new MetadataRequest.Builder(new MetadataRequestData()
-                    .setTopics(convertToMetadataRequestTopic(topics))
-                    .setAllowAutoTopicCreation(false));
-            }
-
-            @Override
-            void handleResponse(AbstractResponse abstractResponse) {
-                MetadataResponse response = (MetadataResponse) 
abstractResponse;
-
-                Map<String, Errors> errors = response.errors();
-                Cluster cluster = response.buildCluster();
-
-                // Group topic partitions by leader
-                Map<Node, Map<String, DeleteRecordsTopic>> leaders = new 
HashMap<>();
-                for (Map.Entry<TopicPartition, RecordsToDelete> entry: 
recordsToDelete.entrySet()) {
-                    TopicPartition topicPartition = entry.getKey();
-                    KafkaFutureImpl<DeletedRecords> future = 
futures.get(topicPartition);
-
-                    // Fail partitions with topic errors
-                    Errors topicError = errors.get(topicPartition.topic());
-                    if (errors.containsKey(topicPartition.topic())) {
-                        future.completeExceptionally(topicError.exception());
-                    } else {
-                        Node node = cluster.leaderFor(topicPartition);
-                        if (node != null) {
-                            Map<String, DeleteRecordsTopic> deletionsForLeader 
= leaders.computeIfAbsent(
-                                    node, key -> new HashMap<>());
-                            DeleteRecordsTopic deleteRecords = 
deletionsForLeader.get(topicPartition.topic());
-                            if (deleteRecords == null) {
-                                deleteRecords = new DeleteRecordsTopic()
-                                        .setName(topicPartition.topic());
-                                deletionsForLeader.put(topicPartition.topic(), 
deleteRecords);
-                            }
-                            deleteRecords.partitions().add(new 
DeleteRecordsPartition()
-                                    
.setPartitionIndex(topicPartition.partition())
-                                    
.setOffset(entry.getValue().beforeOffset()));
-                        } else {
-                            
future.completeExceptionally(Errors.LEADER_NOT_AVAILABLE.exception());
-                        }
-                    }
-                }
-
-                final long deleteRecordsCallTimeMs = time.milliseconds();
-
-                for (final Map.Entry<Node, Map<String, DeleteRecordsTopic>> 
entry : leaders.entrySet()) {
-                    final Map<String, DeleteRecordsTopic> 
partitionDeleteOffsets = entry.getValue();
-                    final int brokerId = entry.getKey().id();
-
-                    runnable.call(new Call("deleteRecords", deadline,
-                            new ConstantNodeIdProvider(brokerId)) {
-
-                        @Override
-                        DeleteRecordsRequest.Builder createRequest(int 
timeoutMs) {
-                            return new DeleteRecordsRequest.Builder(new 
DeleteRecordsRequestData()
-                                    .setTimeoutMs(timeoutMs)
-                                    .setTopics(new 
ArrayList<>(partitionDeleteOffsets.values())));
-                        }
-
-                        @Override
-                        void handleResponse(AbstractResponse abstractResponse) 
{
-                            DeleteRecordsResponse response = 
(DeleteRecordsResponse) abstractResponse;
-                            for (DeleteRecordsTopicResult topicResult: 
response.data().topics()) {
-                                for 
(DeleteRecordsResponseData.DeleteRecordsPartitionResult partitionResult : 
topicResult.partitions()) {
-                                    KafkaFutureImpl<DeletedRecords> future = 
futures.get(new TopicPartition(topicResult.name(), 
partitionResult.partitionIndex()));
-                                    if (partitionResult.errorCode() == 
Errors.NONE.code()) {
-                                        future.complete(new 
DeletedRecords(partitionResult.lowWatermark()));
-                                    } else {
-                                        
future.completeExceptionally(Errors.forCode(partitionResult.errorCode()).exception());
-                                    }
-                                }
-                            }
-                        }
-
-                        @Override
-                        void handleFailure(Throwable throwable) {
-                            Stream<KafkaFutureImpl<DeletedRecords>> 
callFutures =
-                                    
partitionDeleteOffsets.values().stream().flatMap(
-                                        recordsToDelete ->
-                                                
recordsToDelete.partitions().stream().map(partitionsToDelete ->
-                                                    new 
TopicPartition(recordsToDelete.name(), partitionsToDelete.partitionIndex()))
-                                    ).map(futures::get);
-                            completeAllExceptionally(callFutures, throwable);
-                        }
-                    }, deleteRecordsCallTimeMs);
-                }
-            }
-
-            @Override
-            void handleFailure(Throwable throwable) {
-                completeAllExceptionally(futures.values(), throwable);
-            }
-        }, nowMetadata);
-
-        return new DeleteRecordsResult(new HashMap<>(futures));
+        return new DeleteRecordsResult(future.all());
     }
 
     @Override
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/internals/DeleteRecordsHandler.java
 
b/clients/src/main/java/org/apache/kafka/clients/admin/internals/DeleteRecordsHandler.java
new file mode 100644
index 00000000000..2daad226034
--- /dev/null
+++ 
b/clients/src/main/java/org/apache/kafka/clients/admin/internals/DeleteRecordsHandler.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin.internals;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.kafka.clients.admin.DeletedRecords;
+import org.apache.kafka.clients.admin.RecordsToDelete;
+import 
org.apache.kafka.clients.admin.internals.AdminApiFuture.SimpleAdminApiFuture;
+import org.apache.kafka.clients.admin.internals.AdminApiHandler.Batched;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.ApiException;
+import org.apache.kafka.common.errors.InvalidMetadataException;
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.errors.TopicAuthorizationException;
+import org.apache.kafka.common.message.DeleteRecordsRequestData;
+import org.apache.kafka.common.message.DeleteRecordsResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.AbstractResponse;
+import org.apache.kafka.common.requests.DeleteRecordsRequest;
+import org.apache.kafka.common.requests.DeleteRecordsResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.slf4j.Logger;
+
+public final class DeleteRecordsHandler extends Batched<TopicPartition, 
DeletedRecords> {
+
+    private final Map<TopicPartition, RecordsToDelete> recordsToDelete;
+    private final Logger log;
+    private final AdminApiLookupStrategy<TopicPartition> lookupStrategy;
+
+    private final int timeout;
+
+    public DeleteRecordsHandler(
+            Map<TopicPartition, RecordsToDelete> recordsToDelete,
+            LogContext logContext, int timeout
+    ) {
+        this.recordsToDelete = recordsToDelete;
+        this.log = logContext.logger(DeleteRecordsHandler.class);
+        this.lookupStrategy = new PartitionLeaderStrategy(logContext);
+        this.timeout = timeout;
+    }
+
+    @Override
+    public String apiName() {
+        return "deleteRecords";
+    }
+
+    @Override
+    public AdminApiLookupStrategy<TopicPartition> lookupStrategy() {
+        return this.lookupStrategy;
+    }
+
+    public static SimpleAdminApiFuture<TopicPartition, DeletedRecords> 
newFuture(
+            Collection<TopicPartition> topicPartitions
+    ) {
+        return AdminApiFuture.forKeys(new HashSet<>(topicPartitions));
+    }
+
+    @Override
+    public DeleteRecordsRequest.Builder buildBatchedRequest(int brokerId, 
Set<TopicPartition> keys) {
+        Map<String, DeleteRecordsRequestData.DeleteRecordsTopic> 
deletionsForTopic = new HashMap<>();
+        for (Map.Entry<TopicPartition, RecordsToDelete> entry: 
recordsToDelete.entrySet()) {
+            TopicPartition topicPartition = entry.getKey();
+            DeleteRecordsRequestData.DeleteRecordsTopic deleteRecords = 
deletionsForTopic.computeIfAbsent(
+                    topicPartition.topic(),
+                    key -> new 
DeleteRecordsRequestData.DeleteRecordsTopic().setName(topicPartition.topic())
+            );
+            deleteRecords.partitions().add(new 
DeleteRecordsRequestData.DeleteRecordsPartition()
+                    .setPartitionIndex(topicPartition.partition())
+                    .setOffset(entry.getValue().beforeOffset()));
+        }
+
+        DeleteRecordsRequestData data = new DeleteRecordsRequestData()
+                .setTopics(new ArrayList<>(deletionsForTopic.values()))
+                .setTimeoutMs(timeout);
+        return new DeleteRecordsRequest.Builder(data);
+    }
+
+
+    @Override
+    public ApiResult<TopicPartition, DeletedRecords> handleResponse(
+        Node broker,
+        Set<TopicPartition> keys,
+        AbstractResponse abstractResponse
+    ) {
+        DeleteRecordsResponse response = (DeleteRecordsResponse) 
abstractResponse;
+        Map<TopicPartition, DeletedRecords> completed = new HashMap<>();
+        Map<TopicPartition, Throwable> failed = new HashMap<>();
+        List<TopicPartition> unmapped = new ArrayList<>();
+        Set<TopicPartition> retriable = new HashSet<>();
+
+        for (DeleteRecordsResponseData.DeleteRecordsTopicResult topicResult: 
response.data().topics()) {
+            for (DeleteRecordsResponseData.DeleteRecordsPartitionResult 
partitionResult : topicResult.partitions()) {
+                Errors error = Errors.forCode(partitionResult.errorCode());
+                TopicPartition topicPartition = new 
TopicPartition(topicResult.name(), partitionResult.partitionIndex());
+                if (error == Errors.NONE) {
+                    completed.put(topicPartition, new 
DeletedRecords(partitionResult.lowWatermark()));
+                } else {
+                    handlePartitionError(topicPartition, error, failed, 
unmapped, retriable);
+                }
+            }
+        }
+
+        // Sanity-check if the current leader for these partitions returned 
results for all of them
+        for (TopicPartition topicPartition : keys) {
+            if (unmapped.isEmpty()
+                    && !completed.containsKey(topicPartition)
+                    && !failed.containsKey(topicPartition)
+                    && !retriable.contains(topicPartition)
+            ) {
+                ApiException sanityCheckException = new ApiException(
+                        "The response from broker " + broker.id() +
+                                " did not contain a result for topic partition 
" + topicPartition);
+                log.error(
+                        "DeleteRecords request for topic partition {} failed 
sanity check",
+                        topicPartition,
+                        sanityCheckException);
+                failed.put(topicPartition, sanityCheckException);
+            }
+        }
+
+        return new ApiResult<>(completed, failed, unmapped);
+    }
+
+    private void handlePartitionError(
+        TopicPartition topicPartition,
+        Errors error,
+        Map<TopicPartition, Throwable> failed,
+        List<TopicPartition> unmapped,
+        Set<TopicPartition> retriable
+    ) {
+        if (error.exception() instanceof InvalidMetadataException) {
+            log.debug(
+                "DeleteRecords lookup request for topic partition {} will be 
retried due to invalid leader metadata {}",
+                 topicPartition,
+                 error);
+            unmapped.add(topicPartition);
+        } else if (error.exception() instanceof RetriableException) {
+            log.debug(
+                "DeleteRecords fulfillment request for topic partition {} will 
be retried due to {}",
+                topicPartition,
+                error);
+            retriable.add(topicPartition);
+        } else if (error.exception() instanceof TopicAuthorizationException) {
+            log.error(
+                "DeleteRecords request for topic partition {} failed due to an 
error {}",
+                topicPartition,
+                error);
+            failed.put(topicPartition, error.exception());
+        } else {
+            log.error(
+                "DeleteRecords request for topic partition {} failed due to an 
unexpected error {}",
+                topicPartition,
+                error);
+            failed.put(topicPartition, error.exception());
+        }
+    }
+}
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index 27ae3471f4a..cc68f154b02 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -53,9 +53,7 @@ import 
org.apache.kafka.common.errors.GroupAuthorizationException;
 import org.apache.kafka.common.errors.GroupSubscribedToTopicException;
 import org.apache.kafka.common.errors.InvalidRequestException;
 import org.apache.kafka.common.errors.InvalidTopicException;
-import org.apache.kafka.common.errors.LeaderNotAvailableException;
 import org.apache.kafka.common.errors.LogDirNotFoundException;
-import org.apache.kafka.common.errors.NotLeaderOrFollowerException;
 import org.apache.kafka.common.errors.OffsetOutOfRangeException;
 import org.apache.kafka.common.errors.SaslAuthenticationException;
 import org.apache.kafka.common.errors.SecurityDisabledException;
@@ -2275,9 +2273,9 @@ public class KafkaAdminClientTest {
         List<PartitionInfo> partitionInfos = new ArrayList<>();
         partitionInfos.add(new PartitionInfo("my_topic", 0, nodes.get(0), new 
Node[] {nodes.get(0)}, new Node[] {nodes.get(0)}));
         partitionInfos.add(new PartitionInfo("my_topic", 1, nodes.get(0), new 
Node[] {nodes.get(0)}, new Node[] {nodes.get(0)}));
-        partitionInfos.add(new PartitionInfo("my_topic", 2, null, new Node[] 
{nodes.get(0)}, new Node[] {nodes.get(0)}));
+        partitionInfos.add(new PartitionInfo("my_topic", 2, nodes.get(0), new 
Node[] {nodes.get(0)}, new Node[] {nodes.get(0)}));
         partitionInfos.add(new PartitionInfo("my_topic", 3, nodes.get(0), new 
Node[] {nodes.get(0)}, new Node[] {nodes.get(0)}));
-        partitionInfos.add(new PartitionInfo("my_topic", 4, nodes.get(0), new 
Node[] {nodes.get(0)}, new Node[] {nodes.get(0)}));
+
         Cluster cluster = new Cluster("mockClusterId", nodes.values(),
                 partitionInfos, Collections.emptySet(),
                 Collections.emptySet(), nodes.get(0));
@@ -2286,11 +2284,15 @@ public class KafkaAdminClientTest {
         TopicPartition myTopicPartition1 = new TopicPartition("my_topic", 1);
         TopicPartition myTopicPartition2 = new TopicPartition("my_topic", 2);
         TopicPartition myTopicPartition3 = new TopicPartition("my_topic", 3);
-        TopicPartition myTopicPartition4 = new TopicPartition("my_topic", 4);
+
 
         try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster)) 
{
             env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
 
+            env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, 
Errors.LEADER_NOT_AVAILABLE));
+            env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, 
Errors.UNKNOWN_TOPIC_OR_PARTITION));
+            env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, 
Errors.NONE));
+
             DeleteRecordsResponseData m = new DeleteRecordsResponseData();
             m.topics().add(new 
DeleteRecordsResponseData.DeleteRecordsTopicResult().setName(myTopicPartition0.topic())
                     .setPartitions(new 
DeleteRecordsResponseData.DeleteRecordsPartitionResultCollection(asList(
@@ -2303,36 +2305,10 @@ public class KafkaAdminClientTest {
                             
.setLowWatermark(DeleteRecordsResponse.INVALID_LOW_WATERMARK)
                             .setErrorCode(Errors.OFFSET_OUT_OF_RANGE.code()),
                         new 
DeleteRecordsResponseData.DeleteRecordsPartitionResult()
-                            .setPartitionIndex(myTopicPartition3.partition())
+                            .setPartitionIndex(myTopicPartition2.partition())
                             
.setLowWatermark(DeleteRecordsResponse.INVALID_LOW_WATERMARK)
-                            
.setErrorCode(Errors.NOT_LEADER_OR_FOLLOWER.code()),
-                        new 
DeleteRecordsResponseData.DeleteRecordsPartitionResult()
-                            .setPartitionIndex(myTopicPartition4.partition())
-                            
.setLowWatermark(DeleteRecordsResponse.INVALID_LOW_WATERMARK)
-                            
.setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code())
+                            
.setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code())
                     ).iterator())));
-
-            List<MetadataResponse.TopicMetadata> t = new ArrayList<>();
-            List<MetadataResponse.PartitionMetadata> p = new ArrayList<>();
-            p.add(new MetadataResponse.PartitionMetadata(Errors.NONE, 
myTopicPartition0,
-                    Optional.of(nodes.get(0).id()), Optional.of(5), 
singletonList(nodes.get(0).id()),
-                    singletonList(nodes.get(0).id()), 
Collections.emptyList()));
-            p.add(new MetadataResponse.PartitionMetadata(Errors.NONE, 
myTopicPartition1,
-                    Optional.of(nodes.get(0).id()), Optional.of(5), 
singletonList(nodes.get(0).id()),
-                    singletonList(nodes.get(0).id()), 
Collections.emptyList()));
-            p.add(new 
MetadataResponse.PartitionMetadata(Errors.LEADER_NOT_AVAILABLE, 
myTopicPartition2,
-                    Optional.empty(), Optional.empty(), 
singletonList(nodes.get(0).id()),
-                    singletonList(nodes.get(0).id()), 
Collections.emptyList()));
-            p.add(new MetadataResponse.PartitionMetadata(Errors.NONE, 
myTopicPartition3,
-                    Optional.of(nodes.get(0).id()), Optional.of(5), 
singletonList(nodes.get(0).id()),
-                    singletonList(nodes.get(0).id()), 
Collections.emptyList()));
-            p.add(new MetadataResponse.PartitionMetadata(Errors.NONE, 
myTopicPartition4,
-                    Optional.of(nodes.get(0).id()), Optional.of(5), 
singletonList(nodes.get(0).id()),
-                    singletonList(nodes.get(0).id()), 
Collections.emptyList()));
-
-            t.add(new MetadataResponse.TopicMetadata(Errors.NONE, "my_topic", 
false, p));
-
-            
env.kafkaClient().prepareResponse(RequestTestUtils.metadataResponse(cluster.nodes(),
 cluster.clusterResource().clusterId(), cluster.controller().id(), t));
             env.kafkaClient().prepareResponse(new DeleteRecordsResponse(m));
 
             Map<TopicPartition, RecordsToDelete> recordsToDelete = new 
HashMap<>();
@@ -2340,15 +2316,14 @@ public class KafkaAdminClientTest {
             recordsToDelete.put(myTopicPartition1, 
RecordsToDelete.beforeOffset(10L));
             recordsToDelete.put(myTopicPartition2, 
RecordsToDelete.beforeOffset(10L));
             recordsToDelete.put(myTopicPartition3, 
RecordsToDelete.beforeOffset(10L));
-            recordsToDelete.put(myTopicPartition4, 
RecordsToDelete.beforeOffset(10L));
 
             DeleteRecordsResult results = 
env.adminClient().deleteRecords(recordsToDelete);
 
             // success on records deletion for partition 0
             Map<TopicPartition, KafkaFuture<DeletedRecords>> values = 
results.lowWatermarks();
             KafkaFuture<DeletedRecords> myTopicPartition0Result = 
values.get(myTopicPartition0);
-            long lowWatermark = myTopicPartition0Result.get().lowWatermark();
-            assertEquals(lowWatermark, 3);
+            long myTopicPartition0lowWatermark = 
myTopicPartition0Result.get().lowWatermark();
+            assertEquals(3, myTopicPartition0lowWatermark);
 
             // "offset out of range" failure on records deletion for partition 
1
             KafkaFuture<DeletedRecords> myTopicPartition1Result = 
values.get(myTopicPartition1);
@@ -2359,31 +2334,22 @@ public class KafkaAdminClientTest {
                 assertTrue(e0.getCause() instanceof OffsetOutOfRangeException);
             }
 
-            // "leader not available" failure on metadata request for 
partition 2
+            // not authorized to delete records for partition 2
             KafkaFuture<DeletedRecords> myTopicPartition2Result = 
values.get(myTopicPartition2);
             try {
                 myTopicPartition2Result.get();
                 fail("get() should throw ExecutionException");
             } catch (ExecutionException e1) {
-                assertTrue(e1.getCause() instanceof 
LeaderNotAvailableException);
+                assertTrue(e1.getCause() instanceof 
TopicAuthorizationException);
             }
 
-            // "not leader for partition" failure on records deletion for 
partition 3
+            // the response does not contain a result for partition 3
             KafkaFuture<DeletedRecords> myTopicPartition3Result = 
values.get(myTopicPartition3);
             try {
                 myTopicPartition3Result.get();
                 fail("get() should throw ExecutionException");
             } catch (ExecutionException e1) {
-                assertTrue(e1.getCause() instanceof 
NotLeaderOrFollowerException);
-            }
-
-            // "unknown topic or partition" failure on records deletion for 
partition 4
-            KafkaFuture<DeletedRecords> myTopicPartition4Result = 
values.get(myTopicPartition4);
-            try {
-                myTopicPartition4Result.get();
-                fail("get() should throw ExecutionException");
-            } catch (ExecutionException e1) {
-                assertTrue(e1.getCause() instanceof 
UnknownTopicOrPartitionException);
+                assertTrue(e1.getCause() instanceof ApiException);
             }
         }
     }
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/admin/internals/DeleteRecordsHandlerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/admin/internals/DeleteRecordsHandlerTest.java
new file mode 100644
index 00000000000..c39747f1fba
--- /dev/null
+++ 
b/clients/src/test/java/org/apache/kafka/clients/admin/internals/DeleteRecordsHandlerTest.java
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin.internals;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.kafka.clients.admin.DeletedRecords;
+import org.apache.kafka.clients.admin.RecordsToDelete;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.message.DeleteRecordsRequestData;
+import org.apache.kafka.common.message.DeleteRecordsResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.DeleteRecordsRequest;
+import org.apache.kafka.common.requests.DeleteRecordsResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.junit.jupiter.api.Test;
+
+import static java.util.Collections.emptyList;
+import static java.util.Collections.emptyMap;
+import static java.util.Collections.emptySet;
+import static java.util.Collections.singleton;
+
+import static org.apache.kafka.common.utils.Utils.mkSet;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class DeleteRecordsHandlerTest {
+    private final LogContext logContext = new LogContext();
+    private final int timeout = 2000;
+    private final TopicPartition t0p0 = new TopicPartition("t0", 0);
+    private final TopicPartition t0p1 = new TopicPartition("t0", 1);
+    private final TopicPartition t0p2 = new TopicPartition("t0", 2);
+    private final TopicPartition t0p3 = new TopicPartition("t0", 3);
+    private final Node node = new Node(1, "host", 1234);
+    private final Map<TopicPartition, RecordsToDelete> recordsToDelete = new 
HashMap<TopicPartition, RecordsToDelete>() {
+        {
+            put(t0p0, RecordsToDelete.beforeOffset(10L));
+            put(t0p1, RecordsToDelete.beforeOffset(10L));
+            put(t0p2, RecordsToDelete.beforeOffset(10L));
+            put(t0p3, RecordsToDelete.beforeOffset(10L));
+        }
+    };
+
+    @Test
+    public void testBuildRequestSimple() {
+        DeleteRecordsHandler handler = new 
DeleteRecordsHandler(recordsToDelete, logContext, timeout);
+        DeleteRecordsRequest request = handler.buildBatchedRequest(node.id(), 
mkSet(t0p0, t0p1)).build();
+        List<DeleteRecordsRequestData.DeleteRecordsTopic> topicPartitions = 
request.data().topics();
+        assertEquals(1, topicPartitions.size());
+        DeleteRecordsRequestData.DeleteRecordsTopic topic = 
topicPartitions.get(0);
+        assertEquals(4, topic.partitions().size());
+    }
+
+    @Test
+    public void testHandleSuccessfulResponse() {
+        AdminApiHandler.ApiResult<TopicPartition, DeletedRecords> result =
+                handleResponse(createResponse(emptyMap(), 
recordsToDelete.keySet()));
+        assertResult(result, recordsToDelete.keySet(), emptyMap(), 
emptyList(), emptySet());
+    }
+
+    @Test
+    public void testHandleRetriablePartitionTimeoutResponse() {
+        TopicPartition errorPartition = t0p0;
+        Map<TopicPartition, Short> errorsByPartition = new HashMap<>();
+        errorsByPartition.put(errorPartition, Errors.REQUEST_TIMED_OUT.code());
+
+        AdminApiHandler.ApiResult<TopicPartition, DeletedRecords> result =
+                handleResponse(createResponse(errorsByPartition));
+
+        // Timeouts should be retried within the fulfillment stage as they are 
a common type of
+        // retriable error.
+        Set<TopicPartition> retriable = singleton(errorPartition);
+        Set<TopicPartition> completed = new 
HashSet<>(recordsToDelete.keySet());
+        completed.removeAll(retriable);
+        assertResult(result, completed, emptyMap(), emptyList(), retriable);
+    }
+
+    @Test
+    public void testHandleLookupRetriablePartitionInvalidMetadataResponse() {
+        TopicPartition errorPartition = t0p0;
+        Errors error = Errors.NOT_LEADER_OR_FOLLOWER;
+        Map<TopicPartition, Short> errorsByPartition = new HashMap<>();
+        errorsByPartition.put(errorPartition, error.code());
+
+        AdminApiHandler.ApiResult<TopicPartition, DeletedRecords> result =
+                handleResponse(createResponse(errorsByPartition));
+
+        // Some invalid metadata errors should be retried from the lookup 
stage as the partition-to-leader
+        // mappings should be recalculated.
+        List<TopicPartition> unmapped = new ArrayList<>();
+        unmapped.add(errorPartition);
+        Set<TopicPartition> completed = new 
HashSet<>(recordsToDelete.keySet());
+        completed.removeAll(unmapped);
+        assertResult(result, completed, emptyMap(), unmapped, emptySet());
+    }
+
+    @Test
+    public void testHandlePartitionErrorResponse() {
+        TopicPartition errorPartition = t0p0;
+        Errors error = Errors.TOPIC_AUTHORIZATION_FAILED;
+        Map<TopicPartition, Short> errorsByPartition = new HashMap<>();
+        errorsByPartition.put(errorPartition, error.code());
+
+        AdminApiHandler.ApiResult<TopicPartition, DeletedRecords> result =
+                handleResponse(createResponse(errorsByPartition));
+
+        Map<TopicPartition, Throwable> failed = new HashMap<>();
+        failed.put(errorPartition, error.exception());
+        Set<TopicPartition> completed = new 
HashSet<>(recordsToDelete.keySet());
+        completed.removeAll(failed.keySet());
+        assertResult(result, completed, failed, emptyList(), emptySet());
+    }
+
+    @Test
+    public void testHandleUnexpectedPartitionErrorResponse() {
+        TopicPartition errorPartition = t0p0;
+        Errors error = Errors.UNKNOWN_SERVER_ERROR;
+        Map<TopicPartition, Short> errorsByPartition = new HashMap<>();
+        errorsByPartition.put(errorPartition, error.code());
+
+        AdminApiHandler.ApiResult<TopicPartition, DeletedRecords> result =
+                handleResponse(createResponse(errorsByPartition));
+
+        Map<TopicPartition, Throwable> failed = new HashMap<>();
+        failed.put(errorPartition, error.exception());
+        Set<TopicPartition> completed = new 
HashSet<>(recordsToDelete.keySet());
+        completed.removeAll(failed.keySet());
+        assertResult(result, completed, failed, emptyList(), emptySet());
+    }
+
+    @Test
+    public void testMixedResponse() {
+        Map<TopicPartition, Short> errorsByPartition = new HashMap<>();
+
+        TopicPartition errorPartition = t0p0;
+        Errors error = Errors.UNKNOWN_SERVER_ERROR;
+        errorsByPartition.put(errorPartition, error.code());
+
+        TopicPartition retriableErrorPartition = t0p1;
+        Errors retriableError = Errors.NOT_LEADER_OR_FOLLOWER;
+        errorsByPartition.put(retriableErrorPartition, retriableError.code());
+
+        TopicPartition retriableErrorPartition2 = t0p2;
+        Errors retriableError2 = Errors.REQUEST_TIMED_OUT;
+        errorsByPartition.put(retriableErrorPartition2, 
retriableError2.code());
+
+        AdminApiHandler.ApiResult<TopicPartition, DeletedRecords> result =
+                handleResponse(createResponse(errorsByPartition));
+
+        Set<TopicPartition> completed = new 
HashSet<>(recordsToDelete.keySet());
+
+        Map<TopicPartition, Throwable> failed = new HashMap<>();
+        failed.put(errorPartition, error.exception());
+        completed.removeAll(failed.keySet());
+
+        List<TopicPartition> unmapped = new ArrayList<>();
+        unmapped.add(retriableErrorPartition);
+        completed.removeAll(unmapped);
+
+        Set<TopicPartition> retriable = singleton(retriableErrorPartition2);
+        completed.removeAll(retriable);
+
+        assertResult(result, completed, failed, unmapped, retriable);
+    }
+
+    @Test
+    public void testHandleResponseSanityCheck() {
+        TopicPartition errorPartition = t0p0;
+        Map<TopicPartition, RecordsToDelete> recordsToDeleteMap = new 
HashMap<>(recordsToDelete);
+        recordsToDeleteMap.remove(errorPartition);
+
+        AdminApiHandler.ApiResult<TopicPartition, DeletedRecords> result =
+                handleResponse(createResponse(emptyMap(), 
recordsToDeleteMap.keySet()));
+
+        assertEquals(recordsToDelete.size() - 1, result.completedKeys.size());
+        assertEquals(1, result.failedKeys.size());
+        assertEquals(errorPartition, 
result.failedKeys.keySet().iterator().next());
+        String sanityCheckMessage = 
result.failedKeys.get(errorPartition).getMessage();
+        assertTrue(sanityCheckMessage.contains("did not contain a result for 
topic partition"));
+        assertTrue(result.unmappedKeys.isEmpty());
+    }
+
+    private DeleteRecordsResponse createResponse(Map<TopicPartition, Short> 
errorsByPartition) {
+        return createResponse(errorsByPartition, recordsToDelete.keySet());
+    }
+
+    private DeleteRecordsResponse createResponse(
+            Map<TopicPartition, Short> errorsByPartition,
+            Set<TopicPartition> topicPartitions
+    ) {
+        Map<String, 
DeleteRecordsResponseData.DeleteRecordsTopicResultCollection> responsesByTopic 
= new HashMap<>();
+
+        DeleteRecordsResponseData.DeleteRecordsTopicResultCollection 
topicResponse = null;
+        for (TopicPartition topicPartition : topicPartitions) {
+            topicResponse = responsesByTopic.computeIfAbsent(
+                    topicPartition.topic(), t -> new 
DeleteRecordsResponseData.DeleteRecordsTopicResultCollection());
+            topicResponse.add(new 
DeleteRecordsResponseData.DeleteRecordsTopicResult().setName(topicPartition.topic()));
+            DeleteRecordsResponseData.DeleteRecordsPartitionResult 
partitionResponse = new 
DeleteRecordsResponseData.DeleteRecordsPartitionResult();
+            partitionResponse.setPartitionIndex(topicPartition.partition());
+            
partitionResponse.setErrorCode(errorsByPartition.getOrDefault(topicPartition, 
(short) 0));
+            
topicResponse.find(topicPartition.topic()).partitions().add(partitionResponse);
+        }
+        DeleteRecordsResponseData responseData = new 
DeleteRecordsResponseData();
+        responseData.setTopics(topicResponse);
+        return new DeleteRecordsResponse(responseData);
+    }
+
+    private AdminApiHandler.ApiResult<TopicPartition, DeletedRecords> 
handleResponse(DeleteRecordsResponse response) {
+        DeleteRecordsHandler handler =
+                new DeleteRecordsHandler(recordsToDelete, logContext, timeout);
+        return handler.handleResponse(node, recordsToDelete.keySet(), 
response);
+    }
+
+    private void assertResult(
+            AdminApiHandler.ApiResult<TopicPartition, DeletedRecords> result,
+            Set<TopicPartition> expectedCompleted,
+            Map<TopicPartition, Throwable> expectedFailed,
+            List<TopicPartition> expectedUnmapped,
+            Set<TopicPartition> expectedRetriable
+    ) {
+        assertEquals(expectedCompleted, result.completedKeys.keySet());
+        assertEquals(expectedFailed, result.failedKeys);
+        assertEquals(expectedUnmapped, result.unmappedKeys);
+        Set<TopicPartition> actualRetriable = new 
HashSet<>(recordsToDelete.keySet());
+        actualRetriable.removeAll(result.completedKeys.keySet());
+        actualRetriable.removeAll(result.failedKeys.keySet());
+        actualRetriable.removeAll(new HashSet<>(result.unmappedKeys));
+        assertEquals(expectedRetriable, actualRetriable);
+    }
+}
diff --git 
a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala 
b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
index e81a28e2504..561e89ed64a 100644
--- 
a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
+++ 
b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
@@ -957,15 +957,9 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
       .lowWatermarks.get(topicPartition).get.lowWatermark)
 
     // OffsetOutOfRangeException if offset > high_watermark
-    var cause = assertThrows(classOf[ExecutionException],
+    val cause = assertThrows(classOf[ExecutionException],
       () => client.deleteRecords(Map(topicPartition -> 
RecordsToDelete.beforeOffset(20L)).asJava).lowWatermarks.get(topicPartition).get).getCause
     assertEquals(classOf[OffsetOutOfRangeException], cause.getClass)
-
-    val nonExistPartition = new TopicPartition(topic, 3)
-    // LeaderNotAvailableException if non existent partition
-    cause = assertThrows(classOf[ExecutionException],
-      () => client.deleteRecords(Map(nonExistPartition -> 
RecordsToDelete.beforeOffset(20L)).asJava).lowWatermarks.get(nonExistPartition).get).getCause
-    assertEquals(classOf[LeaderNotAvailableException], cause.getClass)
   }
 
   @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)


Reply via email to