This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 37584ce  KAFKA-13103: add REBALANCE_IN_PROGRESS error as retriable 
error for AlterConsumerGroupOffsetsHandler (#11086)
37584ce is described below

commit 37584ce4f57a9e087e9c9280803e556a856d95e8
Author: Luke Chen <[email protected]>
AuthorDate: Sat Sep 4 02:29:44 2021 +0800

    KAFKA-13103: add REBALANCE_IN_PROGRESS error as retriable error for 
AlterConsumerGroupOffsetsHandler (#11086)
    
    This patch adds `REBALANCE_IN_PROGRESS` error as retriable error for 
`AlterConsumerGroupOffsetsHandler`, and tests for it.
    
    Reviewers: David Jacot <[email protected]>
---
 .../admin/internals/AlterConsumerGroupOffsetsHandler.java      | 10 +++++-----
 .../admin/internals/ListConsumerGroupOffsetsHandler.java       |  1 -
 .../admin/internals/RemoveMembersFromConsumerGroupHandler.java |  7 +------
 .../org/apache/kafka/clients/admin/KafkaAdminClientTest.java   |  9 ++++++---
 .../admin/internals/AlterConsumerGroupOffsetsHandlerTest.java  |  2 +-
 5 files changed, 13 insertions(+), 16 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/internals/AlterConsumerGroupOffsetsHandler.java
 
b/clients/src/main/java/org/apache/kafka/clients/admin/internals/AlterConsumerGroupOffsetsHandler.java
index 7ac90b6..cb7551e 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/admin/internals/AlterConsumerGroupOffsetsHandler.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/admin/internals/AlterConsumerGroupOffsetsHandler.java
@@ -159,24 +159,24 @@ public class AlterConsumerGroupOffsetsHandler implements 
AdminApiHandler<Coordin
         Set<CoordinatorKey> groupsToRetry
     ) {
         switch (error) {
-            // If the coordinator is in the middle of loading, then we just 
need to retry.
+            // If the coordinator is in the middle of loading, or rebalance is 
in progress, then we just need to retry.
             case COORDINATOR_LOAD_IN_PROGRESS:
-                log.debug("OffsetCommit request for group id {} failed because 
the coordinator" +
-                    " is still in the process of loading state. Will retry.", 
groupId.idValue);
+            case REBALANCE_IN_PROGRESS:
+                log.debug("OffsetCommit request for group id {} returned error 
{}. Will retry.",
+                    groupId.idValue, error);
                 groupsToRetry.add(groupId);
                 break;
 
             // If the coordinator is not available, then we unmap and retry.
             case COORDINATOR_NOT_AVAILABLE:
             case NOT_COORDINATOR:
-                log.debug("OffsetCommit request for group id {} returned error 
{}. Will retry.",
+                log.debug("OffsetCommit request for group id {} returned error 
{}. Will rediscover the coordinator and retry.",
                     groupId.idValue, error);
                 groupsToUnmap.add(groupId);
                 break;
 
             // Group level errors.
             case INVALID_GROUP_ID:
-            case REBALANCE_IN_PROGRESS:
             case INVALID_COMMIT_OFFSET_SIZE:
             case GROUP_AUTHORIZATION_FAILED:
                 log.debug("OffsetCommit request for group id {} failed due to 
error {}.",
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/internals/ListConsumerGroupOffsetsHandler.java
 
b/clients/src/main/java/org/apache/kafka/clients/admin/internals/ListConsumerGroupOffsetsHandler.java
index d5b2105..b1d2e9d 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/admin/internals/ListConsumerGroupOffsetsHandler.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/admin/internals/ListConsumerGroupOffsetsHandler.java
@@ -139,7 +139,6 @@ public class ListConsumerGroupOffsetsHandler implements 
AdminApiHandler<Coordina
                 log.debug("`OffsetFetch` request for group id {} failed due to 
error {}", groupId.idValue, error);
                 failed.put(groupId, error.exception());
                 break;
-
             case COORDINATOR_LOAD_IN_PROGRESS:
                 // If the coordinator is in the middle of loading, then we 
just need to retry
                 log.debug("`OffsetFetch` request for group id {} failed 
because the coordinator " +
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/internals/RemoveMembersFromConsumerGroupHandler.java
 
b/clients/src/main/java/org/apache/kafka/clients/admin/internals/RemoveMembersFromConsumerGroupHandler.java
index e463911..90b3865 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/admin/internals/RemoveMembersFromConsumerGroupHandler.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/admin/internals/RemoveMembersFromConsumerGroupHandler.java
@@ -110,11 +110,7 @@ public class RemoveMembersFromConsumerGroupHandler 
implements AdminApiHandler<Co
                                  Errors.forCode(memberResponse.errorCode()));
             }
 
-            return new ApiResult<>(
-                Collections.singletonMap(groupId, memberErrors),
-                Collections.emptyMap(),
-                Collections.emptyList()
-            );
+            return ApiResult.completed(groupId, memberErrors);
         }
     }
 
@@ -129,7 +125,6 @@ public class RemoveMembersFromConsumerGroupHandler 
implements AdminApiHandler<Co
                 log.debug("`LeaveGroup` request for group id {} failed due to 
error {}", groupId.idValue, error);
                 failed.put(groupId, error.exception());
                 break;
-
             case COORDINATOR_LOAD_IN_PROGRESS:
                 // If the coordinator is in the middle of loading, then we 
just need to retry
                 log.debug("`LeaveGroup` request for group id {} failed because 
the coordinator " +
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index 46542db..b648b2d 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -3199,7 +3199,7 @@ public class KafkaAdminClientTest {
         try (AdminClientUnitTestEnv env = new 
AdminClientUnitTestEnv(mockCluster(1, 0))) {
             
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create(Arrays.asList(findCoordinatorV3,
 describeGroups)));
 
-            //Retriable FindCoordinatorResponse errors should be retried
+            // Retriable FindCoordinatorResponse errors should be retried
             
env.kafkaClient().prepareResponse(prepareOldFindCoordinatorResponse(Errors.COORDINATOR_NOT_AVAILABLE,
  Node.noNode()));
             
env.kafkaClient().prepareResponse(prepareOldFindCoordinatorResponse(Errors.COORDINATOR_LOAD_IN_PROGRESS,
 Node.noNode()));
 
@@ -3219,9 +3219,9 @@ public class KafkaAdminClientTest {
             final KafkaFuture<Void> results = 
result.deletedGroups().get("groupId");
             assertNull(results.get());
 
-            //should throw error for non-retriable errors
+            // should throw error for non-retriable errors
             env.kafkaClient().prepareResponse(
-                
prepareOldFindCoordinatorResponse(Errors.GROUP_AUTHORIZATION_FAILED,  
Node.noNode()));
+                
prepareOldFindCoordinatorResponse(Errors.GROUP_AUTHORIZATION_FAILED, 
Node.noNode()));
 
             DeleteConsumerGroupsResult errorResult = 
env.adminClient().deleteConsumerGroups(groupIds);
             
TestUtils.assertFutureError(errorResult.deletedGroups().get("groupId"), 
GroupAuthorizationException.class);
@@ -4174,6 +4174,9 @@ public class KafkaAdminClientTest {
                 prepareFindCoordinatorResponse(Errors.NONE, 
env.cluster().controller()));
 
             env.kafkaClient().prepareResponse(
+                prepareOffsetCommitResponse(tp1, 
Errors.REBALANCE_IN_PROGRESS));
+
+            env.kafkaClient().prepareResponse(
                 prepareOffsetCommitResponse(tp1, Errors.NONE));
 
             Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/admin/internals/AlterConsumerGroupOffsetsHandlerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/admin/internals/AlterConsumerGroupOffsetsHandlerTest.java
index 8988c0f..c0ea2ba 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/admin/internals/AlterConsumerGroupOffsetsHandlerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/admin/internals/AlterConsumerGroupOffsetsHandlerTest.java
@@ -83,6 +83,7 @@ public class AlterConsumerGroupOffsetsHandlerTest {
         assertUnmappedKey(partitionErrors(Errors.NOT_COORDINATOR));
         assertUnmappedKey(partitionErrors(Errors.COORDINATOR_NOT_AVAILABLE));
         
assertRetriableError(partitionErrors(Errors.COORDINATOR_LOAD_IN_PROGRESS));
+        assertRetriableError(partitionErrors(Errors.REBALANCE_IN_PROGRESS));
     }
 
     @Test
@@ -94,7 +95,6 @@ public class AlterConsumerGroupOffsetsHandlerTest {
         assertFatalError(partitionErrors(Errors.OFFSET_METADATA_TOO_LARGE));
         assertFatalError(partitionErrors(Errors.ILLEGAL_GENERATION));
         assertFatalError(partitionErrors(Errors.UNKNOWN_MEMBER_ID));
-        assertFatalError(partitionErrors(Errors.REBALANCE_IN_PROGRESS));
         assertFatalError(partitionErrors(Errors.INVALID_COMMIT_OFFSET_SIZE));
         assertFatalError(partitionErrors(Errors.UNKNOWN_SERVER_ERROR));
     }

Reply via email to