This is an automated email from the ASF dual-hosted git repository. dajac pushed a commit to branch 3.0 in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.0 by this push: new 7698afc KAFKA-13059: Make DeleteConsumerGroupOffsetsHandler unmap for COORDINATOR_NOT_AVAILABLE error (#11019) 7698afc is described below commit 7698afc81976c2f3aab981de40ea6ab44deca6dc Author: Luke Chen <show...@gmail.com> AuthorDate: Thu Jul 15 20:18:03 2021 +0800 KAFKA-13059: Make DeleteConsumerGroupOffsetsHandler unmap for COORDINATOR_NOT_AVAILABLE error (#11019) This patch improves the error handling in `DeleteConsumerGroupOffsetsHandler`. `COORDINATOR_NOT_AVAILABLE` is not unmapped to trigger a new find coordinator request to be sent out. Reviewers: David Jacot <dja...@confluent.io> --- .../DeleteConsumerGroupOffsetsHandler.java | 76 +++++++++------ .../kafka/clients/admin/KafkaAdminClientTest.java | 18 ++-- .../DeleteConsumerGroupOffsetsHandlerTest.java | 104 ++++++++++++++++----- 3 files changed, 143 insertions(+), 55 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupOffsetsHandler.java b/clients/src/main/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupOffsetsHandler.java index 7e8b549..f766a87 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupOffsetsHandler.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupOffsetsHandler.java @@ -19,7 +19,7 @@ package org.apache.kafka.clients.admin.internals; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; -import java.util.List; +import java.util.HashSet; import java.util.Map; import java.util.Set; import java.util.stream.Collectors; @@ -72,8 +72,19 @@ public class DeleteConsumerGroupOffsetsHandler implements AdminApiHandler<Coordi return AdminApiFuture.forKeys(Collections.singleton(CoordinatorKey.byGroupId(groupId))); } + private void validateKeys( + Set<CoordinatorKey> groupIds + ) { + if (!groupIds.equals(Collections.singleton(groupId))) { + throw new IllegalArgumentException("Received unexpected group ids " + groupIds + + " (expected only " + Collections.singleton(groupId) + ")"); + } + } + @Override - public OffsetDeleteRequest.Builder buildRequest(int coordinatorId, Set<CoordinatorKey> keys) { + public OffsetDeleteRequest.Builder buildRequest(int coordinatorId, Set<CoordinatorKey> groupIds) { + validateKeys(groupIds); + final OffsetDeleteRequestTopicCollection topics = new OffsetDeleteRequestTopicCollection(); partitions.stream().collect(Collectors.groupingBy(TopicPartition::topic)).forEach((topic, topicPartitions) -> topics.add( new OffsetDeleteRequestTopic() @@ -97,54 +108,67 @@ public class DeleteConsumerGroupOffsetsHandler implements AdminApiHandler<Coordi Set<CoordinatorKey> groupIds, AbstractResponse abstractResponse ) { - final OffsetDeleteResponse response = (OffsetDeleteResponse) abstractResponse; - Map<CoordinatorKey, Map<TopicPartition, Errors>> completed = new HashMap<>(); - Map<CoordinatorKey, Throwable> failed = new HashMap<>(); - List<CoordinatorKey> unmapped = new ArrayList<>(); + validateKeys(groupIds); + final OffsetDeleteResponse response = (OffsetDeleteResponse) abstractResponse; final Errors error = Errors.forCode(response.data().errorCode()); + if (error != Errors.NONE) { - handleError(groupId, error, failed, unmapped); + final Map<CoordinatorKey, Throwable> failed = new HashMap<>(); + final Set<CoordinatorKey> groupsToUnmap = new HashSet<>(); + + handleGroupError(groupId, error, failed, groupsToUnmap); + + return new ApiResult<>(Collections.emptyMap(), failed, new ArrayList<>(groupsToUnmap)); } else { - final Map<TopicPartition, Errors> partitions = new HashMap<>(); - response.data().topics().forEach(topic -> + final Map<TopicPartition, Errors> partitionResults = new HashMap<>(); + response.data().topics().forEach(topic -> topic.partitions().forEach(partition -> { Errors partitionError = Errors.forCode(partition.errorCode()); - if (!handleError(groupId, partitionError, failed, unmapped)) { - partitions.put(new TopicPartition(topic.name(), partition.partitionIndex()), partitionError); - } + + partitionResults.put(new TopicPartition(topic.name(), partition.partitionIndex()), partitionError); }) ); - if (!partitions.isEmpty()) - completed.put(groupId, partitions); + + return new ApiResult<>( + Collections.singletonMap(groupId, partitionResults), + Collections.emptyMap(), + Collections.emptyList() + ); } - return new ApiResult<>(completed, failed, unmapped); } - private boolean handleError( + private void handleGroupError( CoordinatorKey groupId, Errors error, Map<CoordinatorKey, Throwable> failed, - List<CoordinatorKey> unmapped + Set<CoordinatorKey> groupsToUnmap ) { switch (error) { case GROUP_AUTHORIZATION_FAILED: case GROUP_ID_NOT_FOUND: case INVALID_GROUP_ID: - log.error("Received non retriable error for group {} in `DeleteConsumerGroupOffsets` response", groupId, - error.exception()); + case NON_EMPTY_GROUP: + log.debug("`OffsetDelete` request for group id {} failed due to error {}.", groupId.idValue, error); failed.put(groupId, error.exception()); - return true; + break; case COORDINATOR_LOAD_IN_PROGRESS: + // If the coordinator is in the middle of loading, then we just need to retry + log.debug("`OffsetDelete` request for group id {} failed because the coordinator" + + " is still in the process of loading state. Will retry.", groupId.idValue); + break; case COORDINATOR_NOT_AVAILABLE: - return true; case NOT_COORDINATOR: - log.debug("DeleteConsumerGroupOffsets request for group {} returned error {}. Will retry", - groupId, error); - unmapped.add(groupId); - return true; + // If the coordinator is unavailable or there was a coordinator change, then we unmap + // the key so that we retry the `FindCoordinator` request + log.debug("`OffsetDelete` request for group id {} returned error {}. " + + "Will attempt to find the coordinator again and retry.", groupId.idValue, error); + groupsToUnmap.add(groupId); + break; default: - return false; + log.error("`OffsetDelete` request for group id {} failed due to unexpected error {}.", groupId.idValue, error); + failed.put(groupId, error.exception()); + break; } } diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java index 53e326a..e79890f 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java @@ -3290,11 +3290,11 @@ public class KafkaAdminClientTest { env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller())); - env.kafkaClient().prepareResponse(prepareOffsetDeleteResponse("foo", 0, Errors.NOT_COORDINATOR)); + env.kafkaClient().prepareResponse(prepareOffsetDeleteResponse(Errors.NOT_COORDINATOR)); env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller())); final DeleteConsumerGroupOffsetsResult result = env.adminClient() - .deleteConsumerGroupOffsets("groupId", Stream.of(tp1).collect(Collectors.toSet())); + .deleteConsumerGroupOffsets(GROUP_ID, Stream.of(tp1).collect(Collectors.toSet())); TestUtils.assertFutureError(result.all(), TimeoutException.class); } @@ -3322,7 +3322,8 @@ public class KafkaAdminClientTest { mockClient.prepareResponse(body -> { firstAttemptTime.set(time.milliseconds()); return true; - }, prepareOffsetDeleteResponse("foo", 0, Errors.NOT_COORDINATOR)); + }, prepareOffsetDeleteResponse(Errors.NOT_COORDINATOR)); + mockClient.prepareResponse(prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller())); @@ -3402,15 +3403,14 @@ public class KafkaAdminClientTest { prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller())); env.kafkaClient().prepareResponse( - prepareOffsetDeleteResponse(Errors.COORDINATOR_NOT_AVAILABLE)); - - env.kafkaClient().prepareResponse( prepareOffsetDeleteResponse(Errors.COORDINATOR_LOAD_IN_PROGRESS)); /* * We need to return two responses here, one for NOT_COORDINATOR call when calling delete a consumer group * api using coordinator that has moved. This will retry whole operation. So we need to again respond with a * FindCoordinatorResponse. + * + * And the same reason for the following COORDINATOR_NOT_AVAILABLE error response */ env.kafkaClient().prepareResponse( prepareOffsetDeleteResponse(Errors.NOT_COORDINATOR)); @@ -3419,6 +3419,12 @@ public class KafkaAdminClientTest { prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller())); env.kafkaClient().prepareResponse( + prepareOffsetDeleteResponse(Errors.COORDINATOR_NOT_AVAILABLE)); + + env.kafkaClient().prepareResponse( + prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller())); + + env.kafkaClient().prepareResponse( prepareOffsetDeleteResponse("foo", 0, Errors.NONE)); final DeleteConsumerGroupOffsetsResult errorResult1 = env.adminClient() diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupOffsetsHandlerTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupOffsetsHandlerTest.java index 439b377..b4aea93 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupOffsetsHandlerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupOffsetsHandlerTest.java @@ -24,6 +24,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.HashSet; import java.util.Map; @@ -33,6 +34,7 @@ import org.apache.kafka.common.Node; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.GroupAuthorizationException; import org.apache.kafka.common.errors.GroupIdNotFoundException; +import org.apache.kafka.common.errors.GroupNotEmptyException; import org.apache.kafka.common.errors.InvalidGroupIdException; import org.apache.kafka.common.message.OffsetDeleteResponseData; import org.apache.kafka.common.message.OffsetDeleteResponseData.OffsetDeleteResponsePartition; @@ -67,48 +69,88 @@ public class DeleteConsumerGroupOffsetsHandlerTest { @Test public void testSuccessfulHandleResponse() { Map<TopicPartition, Errors> responseData = Collections.singletonMap(t0p0, Errors.NONE); - assertCompleted(handleWithError(Errors.NONE), responseData); + assertCompleted(handleWithGroupError(Errors.NONE), responseData); } @Test public void testUnmappedHandleResponse() { - assertUnmapped(handleWithError(Errors.NOT_COORDINATOR)); + assertUnmapped(handleWithGroupError(Errors.NOT_COORDINATOR)); + assertUnmapped(handleWithGroupError(Errors.COORDINATOR_NOT_AVAILABLE)); } @Test public void testRetriableHandleResponse() { - assertRetriable(handleWithError(Errors.COORDINATOR_LOAD_IN_PROGRESS)); - assertRetriable(handleWithError(Errors.COORDINATOR_NOT_AVAILABLE)); + assertRetriable(handleWithGroupError(Errors.COORDINATOR_LOAD_IN_PROGRESS)); } @Test - public void testFailedHandleResponse() { - assertFailed(GroupAuthorizationException.class, handleWithError(Errors.GROUP_AUTHORIZATION_FAILED)); - assertFailed(GroupIdNotFoundException.class, handleWithError(Errors.GROUP_ID_NOT_FOUND)); - assertFailed(InvalidGroupIdException.class, handleWithError(Errors.INVALID_GROUP_ID)); + public void testFailedHandleResponseWithGroupError() { + assertGroupFailed(GroupAuthorizationException.class, handleWithGroupError(Errors.GROUP_AUTHORIZATION_FAILED)); + assertGroupFailed(GroupIdNotFoundException.class, handleWithGroupError(Errors.GROUP_ID_NOT_FOUND)); + assertGroupFailed(InvalidGroupIdException.class, handleWithGroupError(Errors.INVALID_GROUP_ID)); + assertGroupFailed(GroupNotEmptyException.class, handleWithGroupError(Errors.NON_EMPTY_GROUP)); } - private OffsetDeleteResponse buildResponse(Errors error) { + @Test + public void testFailedHandleResponseWithPartitionError() { + assertPartitionFailed(Collections.singletonMap(t0p0, Errors.GROUP_SUBSCRIBED_TO_TOPIC), + handleWithPartitionError(Errors.GROUP_SUBSCRIBED_TO_TOPIC)); + assertPartitionFailed(Collections.singletonMap(t0p0, Errors.TOPIC_AUTHORIZATION_FAILED), + handleWithPartitionError(Errors.TOPIC_AUTHORIZATION_FAILED)); + assertPartitionFailed(Collections.singletonMap(t0p0, Errors.UNKNOWN_TOPIC_OR_PARTITION), + handleWithPartitionError(Errors.UNKNOWN_TOPIC_OR_PARTITION)); + } + + private OffsetDeleteResponse buildGroupErrorResponse(Errors error) { + OffsetDeleteResponse response = new OffsetDeleteResponse( + new OffsetDeleteResponseData() + .setErrorCode(error.code())); + if (error == Errors.NONE) { + response.data() + .setThrottleTimeMs(0) + .setTopics(new OffsetDeleteResponseTopicCollection(singletonList( + new OffsetDeleteResponseTopic() + .setName(t0p0.topic()) + .setPartitions(new OffsetDeleteResponsePartitionCollection(singletonList( + new OffsetDeleteResponsePartition() + .setPartitionIndex(t0p0.partition()) + .setErrorCode(error.code()) + ).iterator())) + ).iterator())); + } + return response; + } + + private OffsetDeleteResponse buildPartitionErrorResponse(Errors error) { OffsetDeleteResponse response = new OffsetDeleteResponse( - new OffsetDeleteResponseData() - .setThrottleTimeMs(0) - .setTopics(new OffsetDeleteResponseTopicCollection(singletonList( - new OffsetDeleteResponseTopic() - .setName("t0") - .setPartitions(new OffsetDeleteResponsePartitionCollection(singletonList( - new OffsetDeleteResponsePartition() - .setPartitionIndex(0) - .setErrorCode(error.code()) - ).iterator())) - ).iterator()))); + new OffsetDeleteResponseData() + .setThrottleTimeMs(0) + .setTopics(new OffsetDeleteResponseTopicCollection(singletonList( + new OffsetDeleteResponseTopic() + .setName(t0p0.topic()) + .setPartitions(new OffsetDeleteResponsePartitionCollection(singletonList( + new OffsetDeleteResponsePartition() + .setPartitionIndex(t0p0.partition()) + .setErrorCode(error.code()) + ).iterator())) + ).iterator())) + ); return response; } - private AdminApiHandler.ApiResult<CoordinatorKey, Map<TopicPartition, Errors>> handleWithError( + private AdminApiHandler.ApiResult<CoordinatorKey, Map<TopicPartition, Errors>> handleWithGroupError( Errors error ) { DeleteConsumerGroupOffsetsHandler handler = new DeleteConsumerGroupOffsetsHandler(groupId, tps, logContext); - OffsetDeleteResponse response = buildResponse(error); + OffsetDeleteResponse response = buildGroupErrorResponse(error); + return handler.handleResponse(new Node(1, "host", 1234), singleton(CoordinatorKey.byGroupId(groupId)), response); + } + + private AdminApiHandler.ApiResult<CoordinatorKey, Map<TopicPartition, Errors>> handleWithPartitionError( + Errors error + ) { + DeleteConsumerGroupOffsetsHandler handler = new DeleteConsumerGroupOffsetsHandler(groupId, tps, logContext); + OffsetDeleteResponse response = buildPartitionErrorResponse(error); return handler.handleResponse(new Node(1, "host", 1234), singleton(CoordinatorKey.byGroupId(groupId)), response); } @@ -139,7 +181,7 @@ public class DeleteConsumerGroupOffsetsHandlerTest { assertEquals(expected, result.completedKeys.get(key)); } - private void assertFailed( + private void assertGroupFailed( Class<? extends Throwable> expectedExceptionType, AdminApiHandler.ApiResult<CoordinatorKey, Map<TopicPartition, Errors>> result ) { @@ -149,4 +191,20 @@ public class DeleteConsumerGroupOffsetsHandlerTest { assertEquals(singleton(key), result.failedKeys.keySet()); assertTrue(expectedExceptionType.isInstance(result.failedKeys.get(key))); } + + private void assertPartitionFailed( + Map<TopicPartition, Errors> expectedResult, + AdminApiHandler.ApiResult<CoordinatorKey, Map<TopicPartition, Errors>> result + ) { + CoordinatorKey key = CoordinatorKey.byGroupId(groupId); + assertEquals(singleton(key), result.completedKeys.keySet()); + + // verify the completed value is expected result + Collection<Map<TopicPartition, Errors>> completeCollection = result.completedKeys.values(); + assertEquals(1, completeCollection.size()); + assertEquals(expectedResult, result.completedKeys.get(key)); + + assertEquals(emptyList(), result.unmappedKeys); + assertEquals(emptySet(), result.failedKeys.keySet()); + } }