This is an automated email from the ASF dual-hosted git repository.
dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 37584ce KAFKA-13103: add REBALANCE_IN_PROGRESS error as retriable
error for AlterConsumerGroupOffsetsHandler (#11086)
37584ce is described below
commit 37584ce4f57a9e087e9c9280803e556a856d95e8
Author: Luke Chen <[email protected]>
AuthorDate: Sat Sep 4 02:29:44 2021 +0800
KAFKA-13103: add REBALANCE_IN_PROGRESS error as retriable error for
AlterConsumerGroupOffsetsHandler (#11086)
This patch adds `REBALANCE_IN_PROGRESS` error as retriable error for
`AlterConsumerGroupOffsetsHandler`, and tests for it.
Reviewers: David Jacot <[email protected]>
---
.../admin/internals/AlterConsumerGroupOffsetsHandler.java | 10 +++++-----
.../admin/internals/ListConsumerGroupOffsetsHandler.java | 1 -
.../admin/internals/RemoveMembersFromConsumerGroupHandler.java | 7 +------
.../org/apache/kafka/clients/admin/KafkaAdminClientTest.java | 9 ++++++---
.../admin/internals/AlterConsumerGroupOffsetsHandlerTest.java | 2 +-
5 files changed, 13 insertions(+), 16 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/admin/internals/AlterConsumerGroupOffsetsHandler.java
b/clients/src/main/java/org/apache/kafka/clients/admin/internals/AlterConsumerGroupOffsetsHandler.java
index 7ac90b6..cb7551e 100644
---
a/clients/src/main/java/org/apache/kafka/clients/admin/internals/AlterConsumerGroupOffsetsHandler.java
+++
b/clients/src/main/java/org/apache/kafka/clients/admin/internals/AlterConsumerGroupOffsetsHandler.java
@@ -159,24 +159,24 @@ public class AlterConsumerGroupOffsetsHandler implements
AdminApiHandler<Coordin
Set<CoordinatorKey> groupsToRetry
) {
switch (error) {
- // If the coordinator is in the middle of loading, then we just
need to retry.
+ // If the coordinator is in the middle of loading, or rebalance is
in progress, then we just need to retry.
case COORDINATOR_LOAD_IN_PROGRESS:
- log.debug("OffsetCommit request for group id {} failed because
the coordinator" +
- " is still in the process of loading state. Will retry.",
groupId.idValue);
+ case REBALANCE_IN_PROGRESS:
+ log.debug("OffsetCommit request for group id {} returned error
{}. Will retry.",
+ groupId.idValue, error);
groupsToRetry.add(groupId);
break;
// If the coordinator is not available, then we unmap and retry.
case COORDINATOR_NOT_AVAILABLE:
case NOT_COORDINATOR:
- log.debug("OffsetCommit request for group id {} returned error
{}. Will retry.",
+ log.debug("OffsetCommit request for group id {} returned error
{}. Will rediscover the coordinator and retry.",
groupId.idValue, error);
groupsToUnmap.add(groupId);
break;
// Group level errors.
case INVALID_GROUP_ID:
- case REBALANCE_IN_PROGRESS:
case INVALID_COMMIT_OFFSET_SIZE:
case GROUP_AUTHORIZATION_FAILED:
log.debug("OffsetCommit request for group id {} failed due to
error {}.",
diff --git
a/clients/src/main/java/org/apache/kafka/clients/admin/internals/ListConsumerGroupOffsetsHandler.java
b/clients/src/main/java/org/apache/kafka/clients/admin/internals/ListConsumerGroupOffsetsHandler.java
index d5b2105..b1d2e9d 100644
---
a/clients/src/main/java/org/apache/kafka/clients/admin/internals/ListConsumerGroupOffsetsHandler.java
+++
b/clients/src/main/java/org/apache/kafka/clients/admin/internals/ListConsumerGroupOffsetsHandler.java
@@ -139,7 +139,6 @@ public class ListConsumerGroupOffsetsHandler implements
AdminApiHandler<Coordina
log.debug("`OffsetFetch` request for group id {} failed due to
error {}", groupId.idValue, error);
failed.put(groupId, error.exception());
break;
-
case COORDINATOR_LOAD_IN_PROGRESS:
// If the coordinator is in the middle of loading, then we
just need to retry
log.debug("`OffsetFetch` request for group id {} failed
because the coordinator " +
diff --git
a/clients/src/main/java/org/apache/kafka/clients/admin/internals/RemoveMembersFromConsumerGroupHandler.java
b/clients/src/main/java/org/apache/kafka/clients/admin/internals/RemoveMembersFromConsumerGroupHandler.java
index e463911..90b3865 100644
---
a/clients/src/main/java/org/apache/kafka/clients/admin/internals/RemoveMembersFromConsumerGroupHandler.java
+++
b/clients/src/main/java/org/apache/kafka/clients/admin/internals/RemoveMembersFromConsumerGroupHandler.java
@@ -110,11 +110,7 @@ public class RemoveMembersFromConsumerGroupHandler
implements AdminApiHandler<Co
Errors.forCode(memberResponse.errorCode()));
}
- return new ApiResult<>(
- Collections.singletonMap(groupId, memberErrors),
- Collections.emptyMap(),
- Collections.emptyList()
- );
+ return ApiResult.completed(groupId, memberErrors);
}
}
@@ -129,7 +125,6 @@ public class RemoveMembersFromConsumerGroupHandler
implements AdminApiHandler<Co
log.debug("`LeaveGroup` request for group id {} failed due to
error {}", groupId.idValue, error);
failed.put(groupId, error.exception());
break;
-
case COORDINATOR_LOAD_IN_PROGRESS:
// If the coordinator is in the middle of loading, then we
just need to retry
log.debug("`LeaveGroup` request for group id {} failed because
the coordinator " +
diff --git
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index 46542db..b648b2d 100644
---
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -3199,7 +3199,7 @@ public class KafkaAdminClientTest {
try (AdminClientUnitTestEnv env = new
AdminClientUnitTestEnv(mockCluster(1, 0))) {
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create(Arrays.asList(findCoordinatorV3,
describeGroups)));
- //Retriable FindCoordinatorResponse errors should be retried
+ // Retriable FindCoordinatorResponse errors should be retried
env.kafkaClient().prepareResponse(prepareOldFindCoordinatorResponse(Errors.COORDINATOR_NOT_AVAILABLE,
Node.noNode()));
env.kafkaClient().prepareResponse(prepareOldFindCoordinatorResponse(Errors.COORDINATOR_LOAD_IN_PROGRESS,
Node.noNode()));
@@ -3219,9 +3219,9 @@ public class KafkaAdminClientTest {
final KafkaFuture<Void> results =
result.deletedGroups().get("groupId");
assertNull(results.get());
- //should throw error for non-retriable errors
+ // should throw error for non-retriable errors
env.kafkaClient().prepareResponse(
-
prepareOldFindCoordinatorResponse(Errors.GROUP_AUTHORIZATION_FAILED,
Node.noNode()));
+
prepareOldFindCoordinatorResponse(Errors.GROUP_AUTHORIZATION_FAILED,
Node.noNode()));
DeleteConsumerGroupsResult errorResult =
env.adminClient().deleteConsumerGroups(groupIds);
TestUtils.assertFutureError(errorResult.deletedGroups().get("groupId"),
GroupAuthorizationException.class);
@@ -4174,6 +4174,9 @@ public class KafkaAdminClientTest {
prepareFindCoordinatorResponse(Errors.NONE,
env.cluster().controller()));
env.kafkaClient().prepareResponse(
+ prepareOffsetCommitResponse(tp1,
Errors.REBALANCE_IN_PROGRESS));
+
+ env.kafkaClient().prepareResponse(
prepareOffsetCommitResponse(tp1, Errors.NONE));
Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
diff --git
a/clients/src/test/java/org/apache/kafka/clients/admin/internals/AlterConsumerGroupOffsetsHandlerTest.java
b/clients/src/test/java/org/apache/kafka/clients/admin/internals/AlterConsumerGroupOffsetsHandlerTest.java
index 8988c0f..c0ea2ba 100644
---
a/clients/src/test/java/org/apache/kafka/clients/admin/internals/AlterConsumerGroupOffsetsHandlerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/admin/internals/AlterConsumerGroupOffsetsHandlerTest.java
@@ -83,6 +83,7 @@ public class AlterConsumerGroupOffsetsHandlerTest {
assertUnmappedKey(partitionErrors(Errors.NOT_COORDINATOR));
assertUnmappedKey(partitionErrors(Errors.COORDINATOR_NOT_AVAILABLE));
assertRetriableError(partitionErrors(Errors.COORDINATOR_LOAD_IN_PROGRESS));
+ assertRetriableError(partitionErrors(Errors.REBALANCE_IN_PROGRESS));
}
@Test
@@ -94,7 +95,6 @@ public class AlterConsumerGroupOffsetsHandlerTest {
assertFatalError(partitionErrors(Errors.OFFSET_METADATA_TOO_LARGE));
assertFatalError(partitionErrors(Errors.ILLEGAL_GENERATION));
assertFatalError(partitionErrors(Errors.UNKNOWN_MEMBER_ID));
- assertFatalError(partitionErrors(Errors.REBALANCE_IN_PROGRESS));
assertFatalError(partitionErrors(Errors.INVALID_COMMIT_OFFSET_SIZE));
assertFatalError(partitionErrors(Errors.UNKNOWN_SERVER_ERROR));
}