This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.0 by this push:
     new 7698afc  KAFKA-13059: Make DeleteConsumerGroupOffsetsHandler unmap for 
COORDINATOR_NOT_AVAILABLE error (#11019)
7698afc is described below

commit 7698afc81976c2f3aab981de40ea6ab44deca6dc
Author: Luke Chen <show...@gmail.com>
AuthorDate: Thu Jul 15 20:18:03 2021 +0800

    KAFKA-13059: Make DeleteConsumerGroupOffsetsHandler unmap for 
COORDINATOR_NOT_AVAILABLE error (#11019)
    
    This patch improves the error handling in 
`DeleteConsumerGroupOffsetsHandler`. `COORDINATOR_NOT_AVAILABLE` is not 
unmapped to trigger a new find coordinator request to be sent out.
    
    Reviewers: David Jacot <dja...@confluent.io>
---
 .../DeleteConsumerGroupOffsetsHandler.java         |  76 +++++++++------
 .../kafka/clients/admin/KafkaAdminClientTest.java  |  18 ++--
 .../DeleteConsumerGroupOffsetsHandlerTest.java     | 104 ++++++++++++++++-----
 3 files changed, 143 insertions(+), 55 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupOffsetsHandler.java
 
b/clients/src/main/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupOffsetsHandler.java
index 7e8b549..f766a87 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupOffsetsHandler.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupOffsetsHandler.java
@@ -19,7 +19,7 @@ package org.apache.kafka.clients.admin.internals;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.List;
+import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 import java.util.stream.Collectors;
@@ -72,8 +72,19 @@ public class DeleteConsumerGroupOffsetsHandler implements 
AdminApiHandler<Coordi
         return 
AdminApiFuture.forKeys(Collections.singleton(CoordinatorKey.byGroupId(groupId)));
     }
 
+    private void validateKeys(
+        Set<CoordinatorKey> groupIds
+    ) {
+        if (!groupIds.equals(Collections.singleton(groupId))) {
+            throw new IllegalArgumentException("Received unexpected group ids 
" + groupIds +
+                " (expected only " + Collections.singleton(groupId) + ")");
+        }
+    }
+
     @Override
-    public OffsetDeleteRequest.Builder buildRequest(int coordinatorId, 
Set<CoordinatorKey> keys) {
+    public OffsetDeleteRequest.Builder buildRequest(int coordinatorId, 
Set<CoordinatorKey> groupIds) {
+        validateKeys(groupIds);
+
         final OffsetDeleteRequestTopicCollection topics = new 
OffsetDeleteRequestTopicCollection();
         
partitions.stream().collect(Collectors.groupingBy(TopicPartition::topic)).forEach((topic,
 topicPartitions) -> topics.add(
             new OffsetDeleteRequestTopic()
@@ -97,54 +108,67 @@ public class DeleteConsumerGroupOffsetsHandler implements 
AdminApiHandler<Coordi
         Set<CoordinatorKey> groupIds,
         AbstractResponse abstractResponse
     ) {
-        final OffsetDeleteResponse response = (OffsetDeleteResponse) 
abstractResponse;
-        Map<CoordinatorKey, Map<TopicPartition, Errors>> completed = new 
HashMap<>();
-        Map<CoordinatorKey, Throwable> failed = new HashMap<>();
-        List<CoordinatorKey> unmapped = new ArrayList<>();
+        validateKeys(groupIds);
 
+        final OffsetDeleteResponse response = (OffsetDeleteResponse) 
abstractResponse;
         final Errors error = Errors.forCode(response.data().errorCode());
+
         if (error != Errors.NONE) {
-            handleError(groupId, error, failed, unmapped);
+            final Map<CoordinatorKey, Throwable> failed = new HashMap<>();
+            final Set<CoordinatorKey> groupsToUnmap = new HashSet<>();
+
+            handleGroupError(groupId, error, failed, groupsToUnmap);
+
+            return new ApiResult<>(Collections.emptyMap(), failed, new 
ArrayList<>(groupsToUnmap));
         } else {
-            final Map<TopicPartition, Errors> partitions = new HashMap<>();
-            response.data().topics().forEach(topic -> 
+            final Map<TopicPartition, Errors> partitionResults = new 
HashMap<>();
+            response.data().topics().forEach(topic ->
                 topic.partitions().forEach(partition -> {
                     Errors partitionError = 
Errors.forCode(partition.errorCode());
-                    if (!handleError(groupId, partitionError, failed, 
unmapped)) {
-                        partitions.put(new TopicPartition(topic.name(), 
partition.partitionIndex()), partitionError);
-                    }
+
+                    partitionResults.put(new TopicPartition(topic.name(), 
partition.partitionIndex()), partitionError);
                 })
             );
-            if (!partitions.isEmpty())
-                completed.put(groupId, partitions);
+
+            return new ApiResult<>(
+                Collections.singletonMap(groupId, partitionResults),
+                Collections.emptyMap(),
+                Collections.emptyList()
+            );
         }
-        return new ApiResult<>(completed, failed, unmapped);
     }
 
-    private boolean handleError(
+    private void handleGroupError(
         CoordinatorKey groupId,
         Errors error,
         Map<CoordinatorKey, Throwable> failed,
-        List<CoordinatorKey> unmapped
+        Set<CoordinatorKey> groupsToUnmap
     ) {
         switch (error) {
             case GROUP_AUTHORIZATION_FAILED:
             case GROUP_ID_NOT_FOUND:
             case INVALID_GROUP_ID:
-                log.error("Received non retriable error for group {} in 
`DeleteConsumerGroupOffsets` response", groupId,
-                        error.exception());
+            case NON_EMPTY_GROUP:
+                log.debug("`OffsetDelete` request for group id {} failed due 
to error {}.", groupId.idValue, error);
                 failed.put(groupId, error.exception());
-                return true;
+                break;
             case COORDINATOR_LOAD_IN_PROGRESS:
+                // If the coordinator is in the middle of loading, then we 
just need to retry
+                log.debug("`OffsetDelete` request for group id {} failed 
because the coordinator" +
+                    " is still in the process of loading state. Will retry.", 
groupId.idValue);
+                break;
             case COORDINATOR_NOT_AVAILABLE:
-                return true;
             case NOT_COORDINATOR:
-                log.debug("DeleteConsumerGroupOffsets request for group {} 
returned error {}. Will retry",
-                        groupId, error);
-                unmapped.add(groupId);
-                return true;
+                // If the coordinator is unavailable or there was a 
coordinator change, then we unmap
+                // the key so that we retry the `FindCoordinator` request
+                log.debug("`OffsetDelete` request for group id {} returned 
error {}. " +
+                    "Will attempt to find the coordinator again and retry.", 
groupId.idValue, error);
+                groupsToUnmap.add(groupId);
+                break;
             default:
-                return false;
+                log.error("`OffsetDelete` request for group id {} failed due 
to unexpected error {}.", groupId.idValue, error);
+                failed.put(groupId, error.exception());
+                break;
         }
     }
 
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index 53e326a..e79890f 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -3290,11 +3290,11 @@ public class KafkaAdminClientTest {
             env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
 
             
env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.NONE, 
env.cluster().controller()));
-            
env.kafkaClient().prepareResponse(prepareOffsetDeleteResponse("foo", 0, 
Errors.NOT_COORDINATOR));
+            
env.kafkaClient().prepareResponse(prepareOffsetDeleteResponse(Errors.NOT_COORDINATOR));
             
env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.NONE, 
env.cluster().controller()));
 
             final DeleteConsumerGroupOffsetsResult result = env.adminClient()
-                .deleteConsumerGroupOffsets("groupId", 
Stream.of(tp1).collect(Collectors.toSet()));
+                .deleteConsumerGroupOffsets(GROUP_ID, 
Stream.of(tp1).collect(Collectors.toSet()));
 
             TestUtils.assertFutureError(result.all(), TimeoutException.class);
         }
@@ -3322,7 +3322,8 @@ public class KafkaAdminClientTest {
             mockClient.prepareResponse(body -> {
                 firstAttemptTime.set(time.milliseconds());
                 return true;
-            }, prepareOffsetDeleteResponse("foo", 0, Errors.NOT_COORDINATOR));
+            }, prepareOffsetDeleteResponse(Errors.NOT_COORDINATOR));
+
 
             
mockClient.prepareResponse(prepareFindCoordinatorResponse(Errors.NONE, 
env.cluster().controller()));
 
@@ -3402,15 +3403,14 @@ public class KafkaAdminClientTest {
                 prepareFindCoordinatorResponse(Errors.NONE, 
env.cluster().controller()));
 
             env.kafkaClient().prepareResponse(
-                prepareOffsetDeleteResponse(Errors.COORDINATOR_NOT_AVAILABLE));
-
-            env.kafkaClient().prepareResponse(
                 
prepareOffsetDeleteResponse(Errors.COORDINATOR_LOAD_IN_PROGRESS));
 
             /*
              * We need to return two responses here, one for NOT_COORDINATOR 
call when calling delete a consumer group
              * api using coordinator that has moved. This will retry whole 
operation. So we need to again respond with a
              * FindCoordinatorResponse.
+             *
+             * And the same reason for the following COORDINATOR_NOT_AVAILABLE 
error response
              */
             env.kafkaClient().prepareResponse(
                 prepareOffsetDeleteResponse(Errors.NOT_COORDINATOR));
@@ -3419,6 +3419,12 @@ public class KafkaAdminClientTest {
                 prepareFindCoordinatorResponse(Errors.NONE, 
env.cluster().controller()));
 
             env.kafkaClient().prepareResponse(
+                prepareOffsetDeleteResponse(Errors.COORDINATOR_NOT_AVAILABLE));
+
+            env.kafkaClient().prepareResponse(
+                prepareFindCoordinatorResponse(Errors.NONE, 
env.cluster().controller()));
+
+            env.kafkaClient().prepareResponse(
                 prepareOffsetDeleteResponse("foo", 0, Errors.NONE));
 
             final DeleteConsumerGroupOffsetsResult errorResult1 = 
env.adminClient()
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupOffsetsHandlerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupOffsetsHandlerTest.java
index 439b377..b4aea93 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupOffsetsHandlerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupOffsetsHandlerTest.java
@@ -24,6 +24,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.Map;
@@ -33,6 +34,7 @@ import org.apache.kafka.common.Node;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.GroupAuthorizationException;
 import org.apache.kafka.common.errors.GroupIdNotFoundException;
+import org.apache.kafka.common.errors.GroupNotEmptyException;
 import org.apache.kafka.common.errors.InvalidGroupIdException;
 import org.apache.kafka.common.message.OffsetDeleteResponseData;
 import 
org.apache.kafka.common.message.OffsetDeleteResponseData.OffsetDeleteResponsePartition;
@@ -67,48 +69,88 @@ public class DeleteConsumerGroupOffsetsHandlerTest {
     @Test
     public void testSuccessfulHandleResponse() {
         Map<TopicPartition, Errors> responseData = 
Collections.singletonMap(t0p0, Errors.NONE);
-        assertCompleted(handleWithError(Errors.NONE), responseData);
+        assertCompleted(handleWithGroupError(Errors.NONE), responseData);
     }
 
     @Test
     public void testUnmappedHandleResponse() {
-        assertUnmapped(handleWithError(Errors.NOT_COORDINATOR));
+        assertUnmapped(handleWithGroupError(Errors.NOT_COORDINATOR));
+        assertUnmapped(handleWithGroupError(Errors.COORDINATOR_NOT_AVAILABLE));
     }
 
     @Test
     public void testRetriableHandleResponse() {
-        assertRetriable(handleWithError(Errors.COORDINATOR_LOAD_IN_PROGRESS));
-        assertRetriable(handleWithError(Errors.COORDINATOR_NOT_AVAILABLE));
+        
assertRetriable(handleWithGroupError(Errors.COORDINATOR_LOAD_IN_PROGRESS));
     }
 
     @Test
-    public void testFailedHandleResponse() {
-        assertFailed(GroupAuthorizationException.class, 
handleWithError(Errors.GROUP_AUTHORIZATION_FAILED));
-        assertFailed(GroupIdNotFoundException.class, 
handleWithError(Errors.GROUP_ID_NOT_FOUND));
-        assertFailed(InvalidGroupIdException.class, 
handleWithError(Errors.INVALID_GROUP_ID));
+    public void testFailedHandleResponseWithGroupError() {
+        assertGroupFailed(GroupAuthorizationException.class, 
handleWithGroupError(Errors.GROUP_AUTHORIZATION_FAILED));
+        assertGroupFailed(GroupIdNotFoundException.class, 
handleWithGroupError(Errors.GROUP_ID_NOT_FOUND));
+        assertGroupFailed(InvalidGroupIdException.class, 
handleWithGroupError(Errors.INVALID_GROUP_ID));
+        assertGroupFailed(GroupNotEmptyException.class, 
handleWithGroupError(Errors.NON_EMPTY_GROUP));
     }
 
-    private OffsetDeleteResponse buildResponse(Errors error) {
+    @Test
+    public void testFailedHandleResponseWithPartitionError() {
+        assertPartitionFailed(Collections.singletonMap(t0p0, 
Errors.GROUP_SUBSCRIBED_TO_TOPIC),
+            handleWithPartitionError(Errors.GROUP_SUBSCRIBED_TO_TOPIC));
+        assertPartitionFailed(Collections.singletonMap(t0p0, 
Errors.TOPIC_AUTHORIZATION_FAILED),
+            handleWithPartitionError(Errors.TOPIC_AUTHORIZATION_FAILED));
+        assertPartitionFailed(Collections.singletonMap(t0p0, 
Errors.UNKNOWN_TOPIC_OR_PARTITION),
+            handleWithPartitionError(Errors.UNKNOWN_TOPIC_OR_PARTITION));
+    }
+
+    private OffsetDeleteResponse buildGroupErrorResponse(Errors error) {
+        OffsetDeleteResponse response = new OffsetDeleteResponse(
+            new OffsetDeleteResponseData()
+                .setErrorCode(error.code()));
+        if (error == Errors.NONE) {
+            response.data()
+                .setThrottleTimeMs(0)
+                .setTopics(new 
OffsetDeleteResponseTopicCollection(singletonList(
+                    new OffsetDeleteResponseTopic()
+                        .setName(t0p0.topic())
+                        .setPartitions(new 
OffsetDeleteResponsePartitionCollection(singletonList(
+                            new OffsetDeleteResponsePartition()
+                                .setPartitionIndex(t0p0.partition())
+                                .setErrorCode(error.code())
+                        ).iterator()))
+                ).iterator()));
+        }
+        return response;
+    }
+
+    private OffsetDeleteResponse buildPartitionErrorResponse(Errors error) {
         OffsetDeleteResponse response = new OffsetDeleteResponse(
-                new OffsetDeleteResponseData()
-                    .setThrottleTimeMs(0)
-                    .setTopics(new 
OffsetDeleteResponseTopicCollection(singletonList(
-                            new OffsetDeleteResponseTopic()
-                                .setName("t0")
-                                .setPartitions(new 
OffsetDeleteResponsePartitionCollection(singletonList(
-                                        new OffsetDeleteResponsePartition()
-                                            .setPartitionIndex(0)
-                                            .setErrorCode(error.code())
-                                 ).iterator()))
-                      ).iterator())));
+            new OffsetDeleteResponseData()
+                .setThrottleTimeMs(0)
+                .setTopics(new 
OffsetDeleteResponseTopicCollection(singletonList(
+                    new OffsetDeleteResponseTopic()
+                        .setName(t0p0.topic())
+                        .setPartitions(new 
OffsetDeleteResponsePartitionCollection(singletonList(
+                            new OffsetDeleteResponsePartition()
+                                .setPartitionIndex(t0p0.partition())
+                                .setErrorCode(error.code())
+                        ).iterator()))
+                ).iterator()))
+        );
         return response;
     }
 
-    private AdminApiHandler.ApiResult<CoordinatorKey, Map<TopicPartition, 
Errors>> handleWithError(
+    private AdminApiHandler.ApiResult<CoordinatorKey, Map<TopicPartition, 
Errors>> handleWithGroupError(
         Errors error
     ) {
         DeleteConsumerGroupOffsetsHandler handler = new 
DeleteConsumerGroupOffsetsHandler(groupId, tps, logContext);
-        OffsetDeleteResponse response = buildResponse(error);
+        OffsetDeleteResponse response = buildGroupErrorResponse(error);
+        return handler.handleResponse(new Node(1, "host", 1234), 
singleton(CoordinatorKey.byGroupId(groupId)), response);
+    }
+
+    private AdminApiHandler.ApiResult<CoordinatorKey, Map<TopicPartition, 
Errors>> handleWithPartitionError(
+        Errors error
+    ) {
+        DeleteConsumerGroupOffsetsHandler handler = new 
DeleteConsumerGroupOffsetsHandler(groupId, tps, logContext);
+        OffsetDeleteResponse response = buildPartitionErrorResponse(error);
         return handler.handleResponse(new Node(1, "host", 1234), 
singleton(CoordinatorKey.byGroupId(groupId)), response);
     }
 
@@ -139,7 +181,7 @@ public class DeleteConsumerGroupOffsetsHandlerTest {
         assertEquals(expected, result.completedKeys.get(key));
     }
 
-    private void assertFailed(
+    private void assertGroupFailed(
         Class<? extends Throwable> expectedExceptionType,
         AdminApiHandler.ApiResult<CoordinatorKey, Map<TopicPartition, Errors>> 
result
     ) {
@@ -149,4 +191,20 @@ public class DeleteConsumerGroupOffsetsHandlerTest {
         assertEquals(singleton(key), result.failedKeys.keySet());
         
assertTrue(expectedExceptionType.isInstance(result.failedKeys.get(key)));
     }
+
+    private void assertPartitionFailed(
+        Map<TopicPartition, Errors> expectedResult,
+        AdminApiHandler.ApiResult<CoordinatorKey, Map<TopicPartition, Errors>> 
result
+    ) {
+        CoordinatorKey key = CoordinatorKey.byGroupId(groupId);
+        assertEquals(singleton(key), result.completedKeys.keySet());
+
+        // verify the completed value is expected result
+        Collection<Map<TopicPartition, Errors>> completeCollection = 
result.completedKeys.values();
+        assertEquals(1, completeCollection.size());
+        assertEquals(expectedResult, result.completedKeys.get(key));
+
+        assertEquals(emptyList(), result.unmappedKeys);
+        assertEquals(emptySet(), result.failedKeys.keySet());
+    }
 }

Reply via email to