This is an automated email from the ASF dual-hosted git repository.
showuon pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new b2d647904c0 KAFKA-8982: Add retry of fetching metadata to
Admin.deleteRecords (#13760)
b2d647904c0 is described below
commit b2d647904c0d9e02509bc303ae42d880a90d4742
Author: Gantigmaa Selenge <[email protected]>
AuthorDate: Mon Jul 3 02:13:55 2023 +0100
KAFKA-8982: Add retry of fetching metadata to Admin.deleteRecords (#13760)
Use AdminApiDriver class to refresh the metadata and retry the request that
failed with retriable errors.
Reviewers: Luke Chen <[email protected]>, Divij Vaidya <[email protected]>,
Mickael Maison <[email protected]>, Dimitar Dimitrov
<[email protected]>
---
.../kafka/clients/admin/KafkaAdminClient.java | 131 +----------
.../admin/internals/DeleteRecordsHandler.java | 178 +++++++++++++++
.../kafka/clients/admin/KafkaAdminClientTest.java | 64 ++----
.../admin/internals/DeleteRecordsHandlerTest.java | 249 +++++++++++++++++++++
.../kafka/api/PlaintextAdminIntegrationTest.scala | 8 +-
5 files changed, 452 insertions(+), 178 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index f2294df3a3a..140021a4c72 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -44,6 +44,7 @@ import
org.apache.kafka.clients.admin.internals.AlterConsumerGroupOffsetsHandler
import org.apache.kafka.clients.admin.internals.CoordinatorKey;
import
org.apache.kafka.clients.admin.internals.DeleteConsumerGroupOffsetsHandler;
import org.apache.kafka.clients.admin.internals.DeleteConsumerGroupsHandler;
+import org.apache.kafka.clients.admin.internals.DeleteRecordsHandler;
import org.apache.kafka.clients.admin.internals.DescribeConsumerGroupsHandler;
import org.apache.kafka.clients.admin.internals.DescribeProducersHandler;
import org.apache.kafka.clients.admin.internals.DescribeTransactionsHandler;
@@ -121,11 +122,6 @@ import
org.apache.kafka.common.message.DeleteAclsRequestData.DeleteAclsFilter;
import org.apache.kafka.common.message.DeleteAclsResponseData;
import
org.apache.kafka.common.message.DeleteAclsResponseData.DeleteAclsFilterResult;
import
org.apache.kafka.common.message.DeleteAclsResponseData.DeleteAclsMatchingAcl;
-import org.apache.kafka.common.message.DeleteRecordsRequestData;
-import
org.apache.kafka.common.message.DeleteRecordsRequestData.DeleteRecordsPartition;
-import
org.apache.kafka.common.message.DeleteRecordsRequestData.DeleteRecordsTopic;
-import org.apache.kafka.common.message.DeleteRecordsResponseData;
-import
org.apache.kafka.common.message.DeleteRecordsResponseData.DeleteRecordsTopicResult;
import org.apache.kafka.common.message.DeleteTopicsRequestData;
import
org.apache.kafka.common.message.DeleteTopicsRequestData.DeleteTopicState;
import
org.apache.kafka.common.message.DeleteTopicsResponseData.DeletableTopicResult;
@@ -186,8 +182,6 @@ import org.apache.kafka.common.requests.CreateTopicsRequest;
import org.apache.kafka.common.requests.CreateTopicsResponse;
import org.apache.kafka.common.requests.DeleteAclsRequest;
import org.apache.kafka.common.requests.DeleteAclsResponse;
-import org.apache.kafka.common.requests.DeleteRecordsRequest;
-import org.apache.kafka.common.requests.DeleteRecordsResponse;
import org.apache.kafka.common.requests.DeleteTopicsRequest;
import org.apache.kafka.common.requests.DeleteTopicsResponse;
import org.apache.kafka.common.requests.DescribeAclsRequest;
@@ -2917,6 +2911,7 @@ public class KafkaAdminClient extends AdminClient {
maybeCompleteQuotaExceededException(options.shouldRetryOnQuotaViolation(),
throwable, futures, quotaExceededExceptions, (int)
(time.milliseconds() - now));
// Fail all the other remaining futures
+
completeAllExceptionally(futures.values(), throwable);
}
};
@@ -2925,123 +2920,15 @@ public class KafkaAdminClient extends AdminClient {
@Override
public DeleteRecordsResult deleteRecords(final Map<TopicPartition,
RecordsToDelete> recordsToDelete,
final DeleteRecordsOptions
options) {
-
- // requests need to be sent to partitions leader nodes so ...
- // ... from the provided map it's needed to create more maps grouping
topic/partition per leader
-
- final Map<TopicPartition, KafkaFutureImpl<DeletedRecords>> futures =
new HashMap<>(recordsToDelete.size());
- for (TopicPartition topicPartition: recordsToDelete.keySet()) {
- futures.put(topicPartition, new KafkaFutureImpl<>());
+ SimpleAdminApiFuture<TopicPartition, DeletedRecords> future =
DeleteRecordsHandler.newFuture(recordsToDelete.keySet());
+ int timeoutMs = defaultApiTimeoutMs;
+ if (options.timeoutMs() != null) {
+ timeoutMs = options.timeoutMs();
}
+ DeleteRecordsHandler handler = new
DeleteRecordsHandler(recordsToDelete, logContext, timeoutMs);
+ invokeDriver(handler, future, options.timeoutMs);
- // preparing topics list for asking metadata about them
- final Set<String> topics = new HashSet<>();
- for (TopicPartition topicPartition: recordsToDelete.keySet()) {
- topics.add(topicPartition.topic());
- }
-
- final long nowMetadata = time.milliseconds();
- final long deadline = calcDeadlineMs(nowMetadata, options.timeoutMs());
- // asking for topics metadata for getting partitions leaders
- runnable.call(new Call("topicsMetadata", deadline,
- new LeastLoadedNodeProvider()) {
-
- @Override
- MetadataRequest.Builder createRequest(int timeoutMs) {
- return new MetadataRequest.Builder(new MetadataRequestData()
- .setTopics(convertToMetadataRequestTopic(topics))
- .setAllowAutoTopicCreation(false));
- }
-
- @Override
- void handleResponse(AbstractResponse abstractResponse) {
- MetadataResponse response = (MetadataResponse)
abstractResponse;
-
- Map<String, Errors> errors = response.errors();
- Cluster cluster = response.buildCluster();
-
- // Group topic partitions by leader
- Map<Node, Map<String, DeleteRecordsTopic>> leaders = new
HashMap<>();
- for (Map.Entry<TopicPartition, RecordsToDelete> entry:
recordsToDelete.entrySet()) {
- TopicPartition topicPartition = entry.getKey();
- KafkaFutureImpl<DeletedRecords> future =
futures.get(topicPartition);
-
- // Fail partitions with topic errors
- Errors topicError = errors.get(topicPartition.topic());
- if (errors.containsKey(topicPartition.topic())) {
- future.completeExceptionally(topicError.exception());
- } else {
- Node node = cluster.leaderFor(topicPartition);
- if (node != null) {
- Map<String, DeleteRecordsTopic> deletionsForLeader
= leaders.computeIfAbsent(
- node, key -> new HashMap<>());
- DeleteRecordsTopic deleteRecords =
deletionsForLeader.get(topicPartition.topic());
- if (deleteRecords == null) {
- deleteRecords = new DeleteRecordsTopic()
- .setName(topicPartition.topic());
- deletionsForLeader.put(topicPartition.topic(),
deleteRecords);
- }
- deleteRecords.partitions().add(new
DeleteRecordsPartition()
-
.setPartitionIndex(topicPartition.partition())
-
.setOffset(entry.getValue().beforeOffset()));
- } else {
-
future.completeExceptionally(Errors.LEADER_NOT_AVAILABLE.exception());
- }
- }
- }
-
- final long deleteRecordsCallTimeMs = time.milliseconds();
-
- for (final Map.Entry<Node, Map<String, DeleteRecordsTopic>>
entry : leaders.entrySet()) {
- final Map<String, DeleteRecordsTopic>
partitionDeleteOffsets = entry.getValue();
- final int brokerId = entry.getKey().id();
-
- runnable.call(new Call("deleteRecords", deadline,
- new ConstantNodeIdProvider(brokerId)) {
-
- @Override
- DeleteRecordsRequest.Builder createRequest(int
timeoutMs) {
- return new DeleteRecordsRequest.Builder(new
DeleteRecordsRequestData()
- .setTimeoutMs(timeoutMs)
- .setTopics(new
ArrayList<>(partitionDeleteOffsets.values())));
- }
-
- @Override
- void handleResponse(AbstractResponse abstractResponse)
{
- DeleteRecordsResponse response =
(DeleteRecordsResponse) abstractResponse;
- for (DeleteRecordsTopicResult topicResult:
response.data().topics()) {
- for
(DeleteRecordsResponseData.DeleteRecordsPartitionResult partitionResult :
topicResult.partitions()) {
- KafkaFutureImpl<DeletedRecords> future =
futures.get(new TopicPartition(topicResult.name(),
partitionResult.partitionIndex()));
- if (partitionResult.errorCode() ==
Errors.NONE.code()) {
- future.complete(new
DeletedRecords(partitionResult.lowWatermark()));
- } else {
-
future.completeExceptionally(Errors.forCode(partitionResult.errorCode()).exception());
- }
- }
- }
- }
-
- @Override
- void handleFailure(Throwable throwable) {
- Stream<KafkaFutureImpl<DeletedRecords>>
callFutures =
-
partitionDeleteOffsets.values().stream().flatMap(
- recordsToDelete ->
-
recordsToDelete.partitions().stream().map(partitionsToDelete ->
- new
TopicPartition(recordsToDelete.name(), partitionsToDelete.partitionIndex()))
- ).map(futures::get);
- completeAllExceptionally(callFutures, throwable);
- }
- }, deleteRecordsCallTimeMs);
- }
- }
-
- @Override
- void handleFailure(Throwable throwable) {
- completeAllExceptionally(futures.values(), throwable);
- }
- }, nowMetadata);
-
- return new DeleteRecordsResult(new HashMap<>(futures));
+ return new DeleteRecordsResult(future.all());
}
@Override
diff --git
a/clients/src/main/java/org/apache/kafka/clients/admin/internals/DeleteRecordsHandler.java
b/clients/src/main/java/org/apache/kafka/clients/admin/internals/DeleteRecordsHandler.java
new file mode 100644
index 00000000000..2daad226034
--- /dev/null
+++
b/clients/src/main/java/org/apache/kafka/clients/admin/internals/DeleteRecordsHandler.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin.internals;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.kafka.clients.admin.DeletedRecords;
+import org.apache.kafka.clients.admin.RecordsToDelete;
+import
org.apache.kafka.clients.admin.internals.AdminApiFuture.SimpleAdminApiFuture;
+import org.apache.kafka.clients.admin.internals.AdminApiHandler.Batched;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.ApiException;
+import org.apache.kafka.common.errors.InvalidMetadataException;
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.errors.TopicAuthorizationException;
+import org.apache.kafka.common.message.DeleteRecordsRequestData;
+import org.apache.kafka.common.message.DeleteRecordsResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.AbstractResponse;
+import org.apache.kafka.common.requests.DeleteRecordsRequest;
+import org.apache.kafka.common.requests.DeleteRecordsResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.slf4j.Logger;
+
+public final class DeleteRecordsHandler extends Batched<TopicPartition,
DeletedRecords> {
+
+ private final Map<TopicPartition, RecordsToDelete> recordsToDelete;
+ private final Logger log;
+ private final AdminApiLookupStrategy<TopicPartition> lookupStrategy;
+
+ private final int timeout;
+
+ public DeleteRecordsHandler(
+ Map<TopicPartition, RecordsToDelete> recordsToDelete,
+ LogContext logContext, int timeout
+ ) {
+ this.recordsToDelete = recordsToDelete;
+ this.log = logContext.logger(DeleteRecordsHandler.class);
+ this.lookupStrategy = new PartitionLeaderStrategy(logContext);
+ this.timeout = timeout;
+ }
+
+ @Override
+ public String apiName() {
+ return "deleteRecords";
+ }
+
+ @Override
+ public AdminApiLookupStrategy<TopicPartition> lookupStrategy() {
+ return this.lookupStrategy;
+ }
+
+ public static SimpleAdminApiFuture<TopicPartition, DeletedRecords>
newFuture(
+ Collection<TopicPartition> topicPartitions
+ ) {
+ return AdminApiFuture.forKeys(new HashSet<>(topicPartitions));
+ }
+
+ @Override
+ public DeleteRecordsRequest.Builder buildBatchedRequest(int brokerId,
Set<TopicPartition> keys) {
+ Map<String, DeleteRecordsRequestData.DeleteRecordsTopic>
deletionsForTopic = new HashMap<>();
+ for (Map.Entry<TopicPartition, RecordsToDelete> entry:
recordsToDelete.entrySet()) {
+ TopicPartition topicPartition = entry.getKey();
+ DeleteRecordsRequestData.DeleteRecordsTopic deleteRecords =
deletionsForTopic.computeIfAbsent(
+ topicPartition.topic(),
+ key -> new
DeleteRecordsRequestData.DeleteRecordsTopic().setName(topicPartition.topic())
+ );
+ deleteRecords.partitions().add(new
DeleteRecordsRequestData.DeleteRecordsPartition()
+ .setPartitionIndex(topicPartition.partition())
+ .setOffset(entry.getValue().beforeOffset()));
+ }
+
+ DeleteRecordsRequestData data = new DeleteRecordsRequestData()
+ .setTopics(new ArrayList<>(deletionsForTopic.values()))
+ .setTimeoutMs(timeout);
+ return new DeleteRecordsRequest.Builder(data);
+ }
+
+
+ @Override
+ public ApiResult<TopicPartition, DeletedRecords> handleResponse(
+ Node broker,
+ Set<TopicPartition> keys,
+ AbstractResponse abstractResponse
+ ) {
+ DeleteRecordsResponse response = (DeleteRecordsResponse)
abstractResponse;
+ Map<TopicPartition, DeletedRecords> completed = new HashMap<>();
+ Map<TopicPartition, Throwable> failed = new HashMap<>();
+ List<TopicPartition> unmapped = new ArrayList<>();
+ Set<TopicPartition> retriable = new HashSet<>();
+
+ for (DeleteRecordsResponseData.DeleteRecordsTopicResult topicResult:
response.data().topics()) {
+ for (DeleteRecordsResponseData.DeleteRecordsPartitionResult
partitionResult : topicResult.partitions()) {
+ Errors error = Errors.forCode(partitionResult.errorCode());
+ TopicPartition topicPartition = new
TopicPartition(topicResult.name(), partitionResult.partitionIndex());
+ if (error == Errors.NONE) {
+ completed.put(topicPartition, new
DeletedRecords(partitionResult.lowWatermark()));
+ } else {
+ handlePartitionError(topicPartition, error, failed,
unmapped, retriable);
+ }
+ }
+ }
+
+ // Sanity-check if the current leader for these partitions returned
results for all of them
+ for (TopicPartition topicPartition : keys) {
+ if (unmapped.isEmpty()
+ && !completed.containsKey(topicPartition)
+ && !failed.containsKey(topicPartition)
+ && !retriable.contains(topicPartition)
+ ) {
+ ApiException sanityCheckException = new ApiException(
+ "The response from broker " + broker.id() +
+ " did not contain a result for topic partition
" + topicPartition);
+ log.error(
+ "DeleteRecords request for topic partition {} failed
sanity check",
+ topicPartition,
+ sanityCheckException);
+ failed.put(topicPartition, sanityCheckException);
+ }
+ }
+
+ return new ApiResult<>(completed, failed, unmapped);
+ }
+
+ private void handlePartitionError(
+ TopicPartition topicPartition,
+ Errors error,
+ Map<TopicPartition, Throwable> failed,
+ List<TopicPartition> unmapped,
+ Set<TopicPartition> retriable
+ ) {
+ if (error.exception() instanceof InvalidMetadataException) {
+ log.debug(
+ "DeleteRecords lookup request for topic partition {} will be
retried due to invalid leader metadata {}",
+ topicPartition,
+ error);
+ unmapped.add(topicPartition);
+ } else if (error.exception() instanceof RetriableException) {
+ log.debug(
+ "DeleteRecords fulfillment request for topic partition {} will
be retried due to {}",
+ topicPartition,
+ error);
+ retriable.add(topicPartition);
+ } else if (error.exception() instanceof TopicAuthorizationException) {
+ log.error(
+ "DeleteRecords request for topic partition {} failed due to an
error {}",
+ topicPartition,
+ error);
+ failed.put(topicPartition, error.exception());
+ } else {
+ log.error(
+ "DeleteRecords request for topic partition {} failed due to an
unexpected error {}",
+ topicPartition,
+ error);
+ failed.put(topicPartition, error.exception());
+ }
+ }
+}
diff --git
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index 27ae3471f4a..cc68f154b02 100644
---
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -53,9 +53,7 @@ import
org.apache.kafka.common.errors.GroupAuthorizationException;
import org.apache.kafka.common.errors.GroupSubscribedToTopicException;
import org.apache.kafka.common.errors.InvalidRequestException;
import org.apache.kafka.common.errors.InvalidTopicException;
-import org.apache.kafka.common.errors.LeaderNotAvailableException;
import org.apache.kafka.common.errors.LogDirNotFoundException;
-import org.apache.kafka.common.errors.NotLeaderOrFollowerException;
import org.apache.kafka.common.errors.OffsetOutOfRangeException;
import org.apache.kafka.common.errors.SaslAuthenticationException;
import org.apache.kafka.common.errors.SecurityDisabledException;
@@ -2275,9 +2273,9 @@ public class KafkaAdminClientTest {
List<PartitionInfo> partitionInfos = new ArrayList<>();
partitionInfos.add(new PartitionInfo("my_topic", 0, nodes.get(0), new
Node[] {nodes.get(0)}, new Node[] {nodes.get(0)}));
partitionInfos.add(new PartitionInfo("my_topic", 1, nodes.get(0), new
Node[] {nodes.get(0)}, new Node[] {nodes.get(0)}));
- partitionInfos.add(new PartitionInfo("my_topic", 2, null, new Node[]
{nodes.get(0)}, new Node[] {nodes.get(0)}));
+ partitionInfos.add(new PartitionInfo("my_topic", 2, nodes.get(0), new
Node[] {nodes.get(0)}, new Node[] {nodes.get(0)}));
partitionInfos.add(new PartitionInfo("my_topic", 3, nodes.get(0), new
Node[] {nodes.get(0)}, new Node[] {nodes.get(0)}));
- partitionInfos.add(new PartitionInfo("my_topic", 4, nodes.get(0), new
Node[] {nodes.get(0)}, new Node[] {nodes.get(0)}));
+
Cluster cluster = new Cluster("mockClusterId", nodes.values(),
partitionInfos, Collections.emptySet(),
Collections.emptySet(), nodes.get(0));
@@ -2286,11 +2284,15 @@ public class KafkaAdminClientTest {
TopicPartition myTopicPartition1 = new TopicPartition("my_topic", 1);
TopicPartition myTopicPartition2 = new TopicPartition("my_topic", 2);
TopicPartition myTopicPartition3 = new TopicPartition("my_topic", 3);
- TopicPartition myTopicPartition4 = new TopicPartition("my_topic", 4);
+
try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster))
{
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+ env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster,
Errors.LEADER_NOT_AVAILABLE));
+ env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster,
Errors.UNKNOWN_TOPIC_OR_PARTITION));
+ env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster,
Errors.NONE));
+
DeleteRecordsResponseData m = new DeleteRecordsResponseData();
m.topics().add(new
DeleteRecordsResponseData.DeleteRecordsTopicResult().setName(myTopicPartition0.topic())
.setPartitions(new
DeleteRecordsResponseData.DeleteRecordsPartitionResultCollection(asList(
@@ -2303,36 +2305,10 @@ public class KafkaAdminClientTest {
.setLowWatermark(DeleteRecordsResponse.INVALID_LOW_WATERMARK)
.setErrorCode(Errors.OFFSET_OUT_OF_RANGE.code()),
new
DeleteRecordsResponseData.DeleteRecordsPartitionResult()
- .setPartitionIndex(myTopicPartition3.partition())
+ .setPartitionIndex(myTopicPartition2.partition())
.setLowWatermark(DeleteRecordsResponse.INVALID_LOW_WATERMARK)
-
.setErrorCode(Errors.NOT_LEADER_OR_FOLLOWER.code()),
- new
DeleteRecordsResponseData.DeleteRecordsPartitionResult()
- .setPartitionIndex(myTopicPartition4.partition())
-
.setLowWatermark(DeleteRecordsResponse.INVALID_LOW_WATERMARK)
-
.setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code())
+
.setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code())
).iterator())));
-
- List<MetadataResponse.TopicMetadata> t = new ArrayList<>();
- List<MetadataResponse.PartitionMetadata> p = new ArrayList<>();
- p.add(new MetadataResponse.PartitionMetadata(Errors.NONE,
myTopicPartition0,
- Optional.of(nodes.get(0).id()), Optional.of(5),
singletonList(nodes.get(0).id()),
- singletonList(nodes.get(0).id()),
Collections.emptyList()));
- p.add(new MetadataResponse.PartitionMetadata(Errors.NONE,
myTopicPartition1,
- Optional.of(nodes.get(0).id()), Optional.of(5),
singletonList(nodes.get(0).id()),
- singletonList(nodes.get(0).id()),
Collections.emptyList()));
- p.add(new
MetadataResponse.PartitionMetadata(Errors.LEADER_NOT_AVAILABLE,
myTopicPartition2,
- Optional.empty(), Optional.empty(),
singletonList(nodes.get(0).id()),
- singletonList(nodes.get(0).id()),
Collections.emptyList()));
- p.add(new MetadataResponse.PartitionMetadata(Errors.NONE,
myTopicPartition3,
- Optional.of(nodes.get(0).id()), Optional.of(5),
singletonList(nodes.get(0).id()),
- singletonList(nodes.get(0).id()),
Collections.emptyList()));
- p.add(new MetadataResponse.PartitionMetadata(Errors.NONE,
myTopicPartition4,
- Optional.of(nodes.get(0).id()), Optional.of(5),
singletonList(nodes.get(0).id()),
- singletonList(nodes.get(0).id()),
Collections.emptyList()));
-
- t.add(new MetadataResponse.TopicMetadata(Errors.NONE, "my_topic",
false, p));
-
-
env.kafkaClient().prepareResponse(RequestTestUtils.metadataResponse(cluster.nodes(),
cluster.clusterResource().clusterId(), cluster.controller().id(), t));
env.kafkaClient().prepareResponse(new DeleteRecordsResponse(m));
Map<TopicPartition, RecordsToDelete> recordsToDelete = new
HashMap<>();
@@ -2340,15 +2316,14 @@ public class KafkaAdminClientTest {
recordsToDelete.put(myTopicPartition1,
RecordsToDelete.beforeOffset(10L));
recordsToDelete.put(myTopicPartition2,
RecordsToDelete.beforeOffset(10L));
recordsToDelete.put(myTopicPartition3,
RecordsToDelete.beforeOffset(10L));
- recordsToDelete.put(myTopicPartition4,
RecordsToDelete.beforeOffset(10L));
DeleteRecordsResult results =
env.adminClient().deleteRecords(recordsToDelete);
// success on records deletion for partition 0
Map<TopicPartition, KafkaFuture<DeletedRecords>> values =
results.lowWatermarks();
KafkaFuture<DeletedRecords> myTopicPartition0Result =
values.get(myTopicPartition0);
- long lowWatermark = myTopicPartition0Result.get().lowWatermark();
- assertEquals(lowWatermark, 3);
+ long myTopicPartition0lowWatermark =
myTopicPartition0Result.get().lowWatermark();
+ assertEquals(3, myTopicPartition0lowWatermark);
// "offset out of range" failure on records deletion for partition
1
KafkaFuture<DeletedRecords> myTopicPartition1Result =
values.get(myTopicPartition1);
@@ -2359,31 +2334,22 @@ public class KafkaAdminClientTest {
assertTrue(e0.getCause() instanceof OffsetOutOfRangeException);
}
- // "leader not available" failure on metadata request for
partition 2
+ // not authorized to delete records for partition 2
KafkaFuture<DeletedRecords> myTopicPartition2Result =
values.get(myTopicPartition2);
try {
myTopicPartition2Result.get();
fail("get() should throw ExecutionException");
} catch (ExecutionException e1) {
- assertTrue(e1.getCause() instanceof
LeaderNotAvailableException);
+ assertTrue(e1.getCause() instanceof
TopicAuthorizationException);
}
- // "not leader for partition" failure on records deletion for
partition 3
+ // the response does not contain a result for partition 3
KafkaFuture<DeletedRecords> myTopicPartition3Result =
values.get(myTopicPartition3);
try {
myTopicPartition3Result.get();
fail("get() should throw ExecutionException");
} catch (ExecutionException e1) {
- assertTrue(e1.getCause() instanceof
NotLeaderOrFollowerException);
- }
-
- // "unknown topic or partition" failure on records deletion for
partition 4
- KafkaFuture<DeletedRecords> myTopicPartition4Result =
values.get(myTopicPartition4);
- try {
- myTopicPartition4Result.get();
- fail("get() should throw ExecutionException");
- } catch (ExecutionException e1) {
- assertTrue(e1.getCause() instanceof
UnknownTopicOrPartitionException);
+ assertTrue(e1.getCause() instanceof ApiException);
}
}
}
diff --git
a/clients/src/test/java/org/apache/kafka/clients/admin/internals/DeleteRecordsHandlerTest.java
b/clients/src/test/java/org/apache/kafka/clients/admin/internals/DeleteRecordsHandlerTest.java
new file mode 100644
index 00000000000..c39747f1fba
--- /dev/null
+++
b/clients/src/test/java/org/apache/kafka/clients/admin/internals/DeleteRecordsHandlerTest.java
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin.internals;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.kafka.clients.admin.DeletedRecords;
+import org.apache.kafka.clients.admin.RecordsToDelete;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.message.DeleteRecordsRequestData;
+import org.apache.kafka.common.message.DeleteRecordsResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.DeleteRecordsRequest;
+import org.apache.kafka.common.requests.DeleteRecordsResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.junit.jupiter.api.Test;
+
+import static java.util.Collections.emptyList;
+import static java.util.Collections.emptyMap;
+import static java.util.Collections.emptySet;
+import static java.util.Collections.singleton;
+
+import static org.apache.kafka.common.utils.Utils.mkSet;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class DeleteRecordsHandlerTest {
+ private final LogContext logContext = new LogContext();
+ private final int timeout = 2000;
+ private final TopicPartition t0p0 = new TopicPartition("t0", 0);
+ private final TopicPartition t0p1 = new TopicPartition("t0", 1);
+ private final TopicPartition t0p2 = new TopicPartition("t0", 2);
+ private final TopicPartition t0p3 = new TopicPartition("t0", 3);
+ private final Node node = new Node(1, "host", 1234);
+ private final Map<TopicPartition, RecordsToDelete> recordsToDelete = new
HashMap<TopicPartition, RecordsToDelete>() {
+ {
+ put(t0p0, RecordsToDelete.beforeOffset(10L));
+ put(t0p1, RecordsToDelete.beforeOffset(10L));
+ put(t0p2, RecordsToDelete.beforeOffset(10L));
+ put(t0p3, RecordsToDelete.beforeOffset(10L));
+ }
+ };
+
+ @Test
+ public void testBuildRequestSimple() {
+ DeleteRecordsHandler handler = new
DeleteRecordsHandler(recordsToDelete, logContext, timeout);
+ DeleteRecordsRequest request = handler.buildBatchedRequest(node.id(),
mkSet(t0p0, t0p1)).build();
+ List<DeleteRecordsRequestData.DeleteRecordsTopic> topicPartitions =
request.data().topics();
+ assertEquals(1, topicPartitions.size());
+ DeleteRecordsRequestData.DeleteRecordsTopic topic =
topicPartitions.get(0);
+ assertEquals(4, topic.partitions().size());
+ }
+
+ @Test
+ public void testHandleSuccessfulResponse() {
+ AdminApiHandler.ApiResult<TopicPartition, DeletedRecords> result =
+ handleResponse(createResponse(emptyMap(),
recordsToDelete.keySet()));
+ assertResult(result, recordsToDelete.keySet(), emptyMap(),
emptyList(), emptySet());
+ }
+
+ @Test
+ public void testHandleRetriablePartitionTimeoutResponse() {
+ TopicPartition errorPartition = t0p0;
+ Map<TopicPartition, Short> errorsByPartition = new HashMap<>();
+ errorsByPartition.put(errorPartition, Errors.REQUEST_TIMED_OUT.code());
+
+ AdminApiHandler.ApiResult<TopicPartition, DeletedRecords> result =
+ handleResponse(createResponse(errorsByPartition));
+
+ // Timeouts should be retried within the fulfillment stage as they are
a common type of
+ // retriable error.
+ Set<TopicPartition> retriable = singleton(errorPartition);
+ Set<TopicPartition> completed = new
HashSet<>(recordsToDelete.keySet());
+ completed.removeAll(retriable);
+ assertResult(result, completed, emptyMap(), emptyList(), retriable);
+ }
+
+ @Test
+ public void testHandleLookupRetriablePartitionInvalidMetadataResponse() {
+ TopicPartition errorPartition = t0p0;
+ Errors error = Errors.NOT_LEADER_OR_FOLLOWER;
+ Map<TopicPartition, Short> errorsByPartition = new HashMap<>();
+ errorsByPartition.put(errorPartition, error.code());
+
+ AdminApiHandler.ApiResult<TopicPartition, DeletedRecords> result =
+ handleResponse(createResponse(errorsByPartition));
+
+ // Some invalid metadata errors should be retried from the lookup
stage as the partition-to-leader
+ // mappings should be recalculated.
+ List<TopicPartition> unmapped = new ArrayList<>();
+ unmapped.add(errorPartition);
+ Set<TopicPartition> completed = new
HashSet<>(recordsToDelete.keySet());
+ completed.removeAll(unmapped);
+ assertResult(result, completed, emptyMap(), unmapped, emptySet());
+ }
+
+ @Test
+ public void testHandlePartitionErrorResponse() {
+ TopicPartition errorPartition = t0p0;
+ Errors error = Errors.TOPIC_AUTHORIZATION_FAILED;
+ Map<TopicPartition, Short> errorsByPartition = new HashMap<>();
+ errorsByPartition.put(errorPartition, error.code());
+
+ AdminApiHandler.ApiResult<TopicPartition, DeletedRecords> result =
+ handleResponse(createResponse(errorsByPartition));
+
+ Map<TopicPartition, Throwable> failed = new HashMap<>();
+ failed.put(errorPartition, error.exception());
+ Set<TopicPartition> completed = new
HashSet<>(recordsToDelete.keySet());
+ completed.removeAll(failed.keySet());
+ assertResult(result, completed, failed, emptyList(), emptySet());
+ }
+
+ @Test
+ public void testHandleUnexpectedPartitionErrorResponse() {
+ TopicPartition errorPartition = t0p0;
+ Errors error = Errors.UNKNOWN_SERVER_ERROR;
+ Map<TopicPartition, Short> errorsByPartition = new HashMap<>();
+ errorsByPartition.put(errorPartition, error.code());
+
+ AdminApiHandler.ApiResult<TopicPartition, DeletedRecords> result =
+ handleResponse(createResponse(errorsByPartition));
+
+ Map<TopicPartition, Throwable> failed = new HashMap<>();
+ failed.put(errorPartition, error.exception());
+ Set<TopicPartition> completed = new
HashSet<>(recordsToDelete.keySet());
+ completed.removeAll(failed.keySet());
+ assertResult(result, completed, failed, emptyList(), emptySet());
+ }
+
+ @Test
+ public void testMixedResponse() {
+ Map<TopicPartition, Short> errorsByPartition = new HashMap<>();
+
+ TopicPartition errorPartition = t0p0;
+ Errors error = Errors.UNKNOWN_SERVER_ERROR;
+ errorsByPartition.put(errorPartition, error.code());
+
+ TopicPartition retriableErrorPartition = t0p1;
+ Errors retriableError = Errors.NOT_LEADER_OR_FOLLOWER;
+ errorsByPartition.put(retriableErrorPartition, retriableError.code());
+
+ TopicPartition retriableErrorPartition2 = t0p2;
+ Errors retriableError2 = Errors.REQUEST_TIMED_OUT;
+ errorsByPartition.put(retriableErrorPartition2,
retriableError2.code());
+
+ AdminApiHandler.ApiResult<TopicPartition, DeletedRecords> result =
+ handleResponse(createResponse(errorsByPartition));
+
+ Set<TopicPartition> completed = new
HashSet<>(recordsToDelete.keySet());
+
+ Map<TopicPartition, Throwable> failed = new HashMap<>();
+ failed.put(errorPartition, error.exception());
+ completed.removeAll(failed.keySet());
+
+ List<TopicPartition> unmapped = new ArrayList<>();
+ unmapped.add(retriableErrorPartition);
+ completed.removeAll(unmapped);
+
+ Set<TopicPartition> retriable = singleton(retriableErrorPartition2);
+ completed.removeAll(retriable);
+
+ assertResult(result, completed, failed, unmapped, retriable);
+ }
+
+ @Test
+ public void testHandleResponseSanityCheck() {
+ TopicPartition errorPartition = t0p0;
+ Map<TopicPartition, RecordsToDelete> recordsToDeleteMap = new
HashMap<>(recordsToDelete);
+ recordsToDeleteMap.remove(errorPartition);
+
+ AdminApiHandler.ApiResult<TopicPartition, DeletedRecords> result =
+ handleResponse(createResponse(emptyMap(),
recordsToDeleteMap.keySet()));
+
+ assertEquals(recordsToDelete.size() - 1, result.completedKeys.size());
+ assertEquals(1, result.failedKeys.size());
+ assertEquals(errorPartition,
result.failedKeys.keySet().iterator().next());
+ String sanityCheckMessage =
result.failedKeys.get(errorPartition).getMessage();
+ assertTrue(sanityCheckMessage.contains("did not contain a result for
topic partition"));
+ assertTrue(result.unmappedKeys.isEmpty());
+ }
+
+ private DeleteRecordsResponse createResponse(Map<TopicPartition, Short>
errorsByPartition) {
+ return createResponse(errorsByPartition, recordsToDelete.keySet());
+ }
+
+ private DeleteRecordsResponse createResponse(
+ Map<TopicPartition, Short> errorsByPartition,
+ Set<TopicPartition> topicPartitions
+ ) {
+ Map<String,
DeleteRecordsResponseData.DeleteRecordsTopicResultCollection> responsesByTopic
= new HashMap<>();
+
+ DeleteRecordsResponseData.DeleteRecordsTopicResultCollection
topicResponse = null;
+ for (TopicPartition topicPartition : topicPartitions) {
+ topicResponse = responsesByTopic.computeIfAbsent(
+ topicPartition.topic(), t -> new
DeleteRecordsResponseData.DeleteRecordsTopicResultCollection());
+ topicResponse.add(new
DeleteRecordsResponseData.DeleteRecordsTopicResult().setName(topicPartition.topic()));
+ DeleteRecordsResponseData.DeleteRecordsPartitionResult
partitionResponse = new
DeleteRecordsResponseData.DeleteRecordsPartitionResult();
+ partitionResponse.setPartitionIndex(topicPartition.partition());
+
partitionResponse.setErrorCode(errorsByPartition.getOrDefault(topicPartition,
(short) 0));
+
topicResponse.find(topicPartition.topic()).partitions().add(partitionResponse);
+ }
+ DeleteRecordsResponseData responseData = new
DeleteRecordsResponseData();
+ responseData.setTopics(topicResponse);
+ return new DeleteRecordsResponse(responseData);
+ }
+
+ private AdminApiHandler.ApiResult<TopicPartition, DeletedRecords>
handleResponse(DeleteRecordsResponse response) {
+ DeleteRecordsHandler handler =
+ new DeleteRecordsHandler(recordsToDelete, logContext, timeout);
+ return handler.handleResponse(node, recordsToDelete.keySet(),
response);
+ }
+
+ private void assertResult(
+ AdminApiHandler.ApiResult<TopicPartition, DeletedRecords> result,
+ Set<TopicPartition> expectedCompleted,
+ Map<TopicPartition, Throwable> expectedFailed,
+ List<TopicPartition> expectedUnmapped,
+ Set<TopicPartition> expectedRetriable
+ ) {
+ assertEquals(expectedCompleted, result.completedKeys.keySet());
+ assertEquals(expectedFailed, result.failedKeys);
+ assertEquals(expectedUnmapped, result.unmappedKeys);
+ Set<TopicPartition> actualRetriable = new
HashSet<>(recordsToDelete.keySet());
+ actualRetriable.removeAll(result.completedKeys.keySet());
+ actualRetriable.removeAll(result.failedKeys.keySet());
+ actualRetriable.removeAll(new HashSet<>(result.unmappedKeys));
+ assertEquals(expectedRetriable, actualRetriable);
+ }
+}
diff --git
a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
index e81a28e2504..561e89ed64a 100644
---
a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
+++
b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
@@ -957,15 +957,9 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
.lowWatermarks.get(topicPartition).get.lowWatermark)
// OffsetOutOfRangeException if offset > high_watermark
- var cause = assertThrows(classOf[ExecutionException],
+ val cause = assertThrows(classOf[ExecutionException],
() => client.deleteRecords(Map(topicPartition ->
RecordsToDelete.beforeOffset(20L)).asJava).lowWatermarks.get(topicPartition).get).getCause
assertEquals(classOf[OffsetOutOfRangeException], cause.getClass)
-
- val nonExistPartition = new TopicPartition(topic, 3)
- // LeaderNotAvailableException if non existent partition
- cause = assertThrows(classOf[ExecutionException],
- () => client.deleteRecords(Map(nonExistPartition ->
RecordsToDelete.beforeOffset(20L)).asJava).lowWatermarks.get(nonExistPartition).get).getCause
- assertEquals(classOf[LeaderNotAvailableException], cause.getClass)
}
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)