This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 4f38c8c KAFKA-7369; Handle retriable errors in AdminClient list
groups API (#5595)
4f38c8c is described below
commit 4f38c8cd6079100548ca8056c358a2fea57986c2
Author: Jason Gustafson <[email protected]>
AuthorDate: Fri Aug 31 18:40:42 2018 -0700
KAFKA-7369; Handle retriable errors in AdminClient list groups API (#5595)
We should retry when possible if ListGroups fails due to a retriable error
(e.g. coordinator loading).
Reviewers: Colin Patrick McCabe <[email protected]>, Guozhang Wang
<[email protected]>
---
.../kafka/clients/admin/KafkaAdminClient.java | 7 ++--
.../kafka/common/requests/ListGroupsResponse.java | 1 +
.../kafka/clients/admin/KafkaAdminClientTest.java | 40 +++++++++++++++++++---
3 files changed, 41 insertions(+), 7 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index 8ddb0c0..904cd06 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -2567,8 +2567,11 @@ public class KafkaAdminClient extends AdminClient {
void handleResponse(AbstractResponse abstractResponse)
{
final ListGroupsResponse response =
(ListGroupsResponse) abstractResponse;
synchronized (results) {
- if (response.error() != Errors.NONE) {
-
results.addError(response.error().exception(), node);
+ Errors error = response.error();
+ if (error ==
Errors.COORDINATOR_LOAD_IN_PROGRESS || error ==
Errors.COORDINATOR_NOT_AVAILABLE) {
+ throw error.exception();
+ } else if (error != Errors.NONE) {
+ results.addError(error.exception(), node);
} else {
for (ListGroupsResponse.Group group :
response.groups()) {
maybeAddConsumerGroup(group);
diff --git
a/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsResponse.java
b/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsResponse.java
index b108803..af6f721 100644
---
a/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsResponse.java
+++
b/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsResponse.java
@@ -62,6 +62,7 @@ public class ListGroupsResponse extends AbstractResponse {
/**
* Possible error codes:
*
+ * COORDINATOR_LOADING_IN_PROGRESS (14)
* COORDINATOR_NOT_AVAILABLE (15)
* AUTHORIZATION_FAILED (29)
*/
diff --git
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index 0245cbd..c0dc542 100644
---
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -36,7 +36,6 @@ import org.apache.kafka.common.acl.AclOperation;
import org.apache.kafka.common.acl.AclPermissionType;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.errors.AuthenticationException;
-import org.apache.kafka.common.errors.CoordinatorNotAvailableException;
import org.apache.kafka.common.errors.GroupAuthorizationException;
import org.apache.kafka.common.errors.InvalidTopicException;
import org.apache.kafka.common.errors.LeaderNotAvailableException;
@@ -46,6 +45,7 @@ import
org.apache.kafka.common.errors.SaslAuthenticationException;
import org.apache.kafka.common.errors.SecurityDisabledException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.TopicDeletionDisabledException;
+import org.apache.kafka.common.errors.UnknownServerException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.ApiError;
@@ -863,9 +863,11 @@ public class KafkaAdminClientTest {
Node node0 = new Node(0, "localhost", 8121);
Node node1 = new Node(1, "localhost", 8122);
Node node2 = new Node(2, "localhost", 8123);
+ Node node3 = new Node(3, "localhost", 8124);
nodes.put(0, node0);
nodes.put(1, node1);
nodes.put(2, node2);
+ nodes.put(3, node3);
final Cluster cluster = new Cluster(
"mockClusterId",
@@ -902,13 +904,19 @@ public class KafkaAdminClientTest {
)),
node0);
+ // handle retriable errors
env.kafkaClient().prepareResponseFrom(
new ListGroupsResponse(
Errors.COORDINATOR_NOT_AVAILABLE,
Collections.emptyList()
),
node1);
-
+ env.kafkaClient().prepareResponseFrom(
+ new ListGroupsResponse(
+ Errors.COORDINATOR_LOAD_IN_PROGRESS,
+ Collections.emptyList()
+ ),
+ node1);
env.kafkaClient().prepareResponseFrom(
new ListGroupsResponse(
Errors.NONE,
@@ -916,15 +924,37 @@ public class KafkaAdminClientTest {
new ListGroupsResponse.Group("group-2",
ConsumerProtocol.PROTOCOL_TYPE),
new
ListGroupsResponse.Group("group-connect-2", "connector")
)),
+ node1);
+
+ env.kafkaClient().prepareResponseFrom(
+ new ListGroupsResponse(
+ Errors.NONE,
+ asList(
+ new ListGroupsResponse.Group("group-3",
ConsumerProtocol.PROTOCOL_TYPE),
+ new
ListGroupsResponse.Group("group-connect-3", "connector")
+ )),
node2);
+ // fatal error
+ env.kafkaClient().prepareResponseFrom(
+ new ListGroupsResponse(
+ Errors.UNKNOWN_SERVER_ERROR,
+ Collections.emptyList()),
+ node3);
+
+
final ListConsumerGroupsResult result =
env.adminClient().listConsumerGroups();
- TestUtils.assertFutureError(result.all(),
CoordinatorNotAvailableException.class);
+ TestUtils.assertFutureError(result.all(),
UnknownServerException.class);
+
Collection<ConsumerGroupListing> listings = result.valid().get();
- assertEquals(2, listings.size());
+ assertEquals(3, listings.size());
+
+ Set<String> groupIds = new HashSet<>();
for (ConsumerGroupListing listing : listings) {
- assertTrue(listing.groupId().equals("group-1") ||
listing.groupId().equals("group-2"));
+ groupIds.add(listing.groupId());
}
+
+ assertEquals(Utils.mkSet("group-1", "group-2", "group-3"),
groupIds);
assertEquals(1, result.errors().get().size());
}
}