This is an automated email from the ASF dual-hosted git repository.
mimaison pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 612e1299e46 KAFKA-18230: Handle not controller or not leader error in
admin client (#18165)
612e1299e46 is described below
commit 612e1299e46ee72bcf763257e0d99dd9a2c48bf6
Author: Luke Chen <[email protected]>
AuthorDate: Wed Feb 5 00:51:24 2025 +0900
KAFKA-18230: Handle not controller or not leader error in admin client
(#18165)
Reviewers: Mickael Maison <[email protected]>, Chia-Ping Tsai
<[email protected]>
---
.../kafka/clients/admin/KafkaAdminClient.java | 9 +
.../clients/admin/AdminClientUnitTestEnv.java | 3 +-
.../kafka/clients/admin/KafkaAdminClientTest.java | 219 ++++++++++++++++++++-
.../kafka/server/KRaftClusterTest.scala | 50 ++++-
4 files changed, 267 insertions(+), 14 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index 3f88cd5aa80..ed693b497e2 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -2656,6 +2656,7 @@ public class KafkaAdminClient extends AdminClient {
@Override
void handleResponse(AbstractResponse abstractResponse) {
+ handleNotControllerError(abstractResponse);
CreateAclsResponse response = (CreateAclsResponse)
abstractResponse;
List<AclCreationResult> responses = response.results();
Iterator<AclCreationResult> iter = responses.iterator();
@@ -2708,6 +2709,7 @@ public class KafkaAdminClient extends AdminClient {
@Override
void handleResponse(AbstractResponse abstractResponse) {
+ handleNotControllerError(abstractResponse);
DeleteAclsResponse response = (DeleteAclsResponse)
abstractResponse;
List<DeleteAclsResponseData.DeleteAclsFilterResult> results =
response.filterResults();
Iterator<DeleteAclsResponseData.DeleteAclsFilterResult> iter =
results.iterator();
@@ -2926,6 +2928,7 @@ public class KafkaAdminClient extends AdminClient {
@Override
public void handleResponse(AbstractResponse abstractResponse) {
+ handleNotControllerError(abstractResponse);
IncrementalAlterConfigsResponse response =
(IncrementalAlterConfigsResponse) abstractResponse;
Map<ConfigResource, ApiError> errors =
IncrementalAlterConfigsResponse.fromResponseData(response.data());
for (Map.Entry<ConfigResource, KafkaFutureImpl<Void>> entry :
futures.entrySet()) {
@@ -4089,8 +4092,11 @@ public class KafkaAdminClient extends AdminClient {
}
private void handleNotControllerError(AbstractResponse response) throws
ApiException {
+ // When sending requests directly to the follower controller, it might
return NOT_LEADER_OR_FOLLOWER error.
if (response.errorCounts().containsKey(Errors.NOT_CONTROLLER)) {
handleNotControllerError(Errors.NOT_CONTROLLER);
+ } else if (metadataManager.usingBootstrapControllers() &&
response.errorCounts().containsKey(Errors.NOT_LEADER_OR_FOLLOWER)) {
+ handleNotControllerError(Errors.NOT_LEADER_OR_FOLLOWER);
}
}
@@ -4652,6 +4658,7 @@ public class KafkaAdminClient extends AdminClient {
@Override
void handleResponse(AbstractResponse response) {
+ handleNotControllerError(response);
final DescribeQuorumResponse quorumResponse =
(DescribeQuorumResponse) response;
if (quorumResponse.data().errorCode() != Errors.NONE.code()) {
throw
Errors.forCode(quorumResponse.data().errorCode()).exception(quorumResponse.data().errorMessage());
@@ -4849,6 +4856,7 @@ public class KafkaAdminClient extends AdminClient {
@Override
void handleResponse(AbstractResponse response) {
+ handleNotControllerError(response);
AddRaftVoterResponse addResponse = (AddRaftVoterResponse)
response;
if (addResponse.data().errorCode() != Errors.NONE.code()) {
ApiError error = new ApiError(
@@ -4893,6 +4901,7 @@ public class KafkaAdminClient extends AdminClient {
@Override
void handleResponse(AbstractResponse response) {
+ handleNotControllerError(response);
RemoveRaftVoterResponse addResponse =
(RemoveRaftVoterResponse) response;
if (addResponse.data().errorCode() != Errors.NONE.code()) {
ApiError error = new ApiError(
diff --git
a/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientUnitTestEnv.java
b/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientUnitTestEnv.java
index 99c823e78f4..add2256b2b3 100644
---
a/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientUnitTestEnv.java
+++
b/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientUnitTestEnv.java
@@ -73,7 +73,8 @@ public class AdminClientUnitTestEnv implements AutoCloseable {
AdminMetadataManager metadataManager = new AdminMetadataManager(new
LogContext(),
adminClientConfig.getLong(AdminClientConfig.RETRY_BACKOFF_MS_CONFIG),
-
adminClientConfig.getLong(AdminClientConfig.METADATA_MAX_AGE_CONFIG), false);
+
adminClientConfig.getLong(AdminClientConfig.METADATA_MAX_AGE_CONFIG),
+
config.containsKey(AdminClientConfig.BOOTSTRAP_CONTROLLERS_CONFIG));
this.mockClient = new MockClient(time, new
MockClient.MockMetadataUpdater() {
@Override
public List<Node> fetchNodes() {
diff --git
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index 07b4b55e268..2f0bdaa7a98 100644
---
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -1509,7 +1509,8 @@ public class KafkaAdminClientTest {
env.cluster().nodes(),
env.cluster().clusterResource().clusterId(),
2,
- MetadataResponse.AUTHORIZED_OPERATIONS_OMITTED)
+ MetadataResponse.AUTHORIZED_OPERATIONS_OMITTED,
+ false)
);
DescribeTopicPartitionsResponseData dataFirstPart = new
DescribeTopicPartitionsResponseData();
@@ -1566,7 +1567,8 @@ public class KafkaAdminClientTest {
env.cluster().nodes(),
env.cluster().clusterResource().clusterId(),
2,
- authorisedOperations)
+ authorisedOperations,
+ false)
);
DescribeTopicPartitionsResponseData responseData = new
DescribeTopicPartitionsResponseData();
@@ -1602,7 +1604,8 @@ public class KafkaAdminClientTest {
env.cluster().nodes(),
env.cluster().clusterResource().clusterId(),
2,
- authorisedOperations)
+ authorisedOperations,
+ false)
);
DescribeTopicPartitionsResponseData responseData = new
DescribeTopicPartitionsResponseData();
@@ -1642,7 +1645,8 @@ public class KafkaAdminClientTest {
env.cluster().nodes(),
env.cluster().clusterResource().clusterId(),
2,
- MetadataResponse.AUTHORIZED_OPERATIONS_OMITTED)
+ MetadataResponse.AUTHORIZED_OPERATIONS_OMITTED,
+ false)
);
DescribeTopicPartitionsResponseData dataFirstPart = new
DescribeTopicPartitionsResponseData();
@@ -1742,7 +1746,8 @@ public class KafkaAdminClientTest {
env.cluster().nodes(),
env.cluster().clusterResource().clusterId(),
2,
- MetadataResponse.AUTHORIZED_OPERATIONS_OMITTED)
+ MetadataResponse.AUTHORIZED_OPERATIONS_OMITTED,
+ false)
);
DescribeTopicPartitionsResponseData dataFirstPart = new
DescribeTopicPartitionsResponseData();
@@ -1886,6 +1891,71 @@ public class KafkaAdminClientTest {
}
}
+ @Test
+ public void testCreateAclsToController() throws Exception {
+ try (AdminClientUnitTestEnv env =
mockClientEnv(AdminClientConfig.BOOTSTRAP_CONTROLLERS_CONFIG, "dummy")) {
+ env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+
+
+ env.kafkaClient().prepareResponse(new CreateAclsResponse(new
CreateAclsResponseData().setResults(asList(
+ new CreateAclsResponseData.AclCreationResult()
+ .setErrorCode(Errors.NOT_CONTROLLER.code())
+ .setErrorMessage("not controller")))));
+ // should retry the describe cluster to update the metadata
+ env.kafkaClient().prepareResponse(
+ prepareDescribeClusterResponse(0,
+ env.cluster().nodes(),
+ env.cluster().clusterResource().clusterId(),
+ 2,
+ MetadataResponse.AUTHORIZED_OPERATIONS_OMITTED,
+ true)
+ );
+
+ // Test a call where we successfully create two ACLs.
+ env.kafkaClient().prepareResponse(new CreateAclsResponse(new
CreateAclsResponseData().setResults(asList(
+ new CreateAclsResponseData.AclCreationResult()))));
+
+ CreateAclsResult results =
env.adminClient().createAcls(asList(ACL1));
+ assertCollectionIs(results.values().keySet(), ACL1);
+ for (KafkaFuture<Void> future : results.values().values())
+ future.get();
+ results.all().get();
+ }
+ }
+
+ @Test
+ public void testDeleteAclsToController() throws Exception {
+ try (AdminClientUnitTestEnv env =
mockClientEnv(AdminClientConfig.BOOTSTRAP_CONTROLLERS_CONFIG, "dummy")) {
+ env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+
+ env.kafkaClient().prepareResponse(new DeleteAclsResponse(new
DeleteAclsResponseData()
+ .setThrottleTimeMs(0)
+ .setFilterResults(asList(new
DeleteAclsResponseData.DeleteAclsFilterResult()
+ .setErrorCode(Errors.NOT_CONTROLLER.code())
+ .setErrorMessage("not controller"))),
+ ApiKeys.DELETE_ACLS.latestVersion()));
+ // should retry the describe cluster to update the metadata
+ env.kafkaClient().prepareResponse(
+ prepareDescribeClusterResponse(0,
+ env.cluster().nodes(),
+ env.cluster().clusterResource().clusterId(),
+ 2,
+ MetadataResponse.AUTHORIZED_OPERATIONS_OMITTED,
+ true)
+ );
+ // Test a call where there are no errors.
+ env.kafkaClient().prepareResponse(new DeleteAclsResponse(new
DeleteAclsResponseData()
+ .setThrottleTimeMs(0)
+ .setFilterResults(asList(
+ new DeleteAclsResponseData.DeleteAclsFilterResult()
+
.setMatchingAcls(singletonList(DeleteAclsResponse.matchingAcl(ACL1,
ApiError.NONE))))),
+ ApiKeys.DELETE_ACLS.latestVersion()));
+ DeleteAclsResult results =
env.adminClient().deleteAcls(asList(FILTER1));
+ Collection<AclBinding> deleted = results.all().get();
+ assertCollectionIs(deleted, ACL1);
+ }
+ }
+
@Test
public void testCreateAcls() throws Exception {
try (AdminClientUnitTestEnv env = mockClientEnv()) {
@@ -2809,7 +2879,8 @@ public class KafkaAdminClientTest {
env.cluster().nodes(),
env.cluster().clusterResource().clusterId(),
2,
- MetadataResponse.AUTHORIZED_OPERATIONS_OMITTED));
+ MetadataResponse.AUTHORIZED_OPERATIONS_OMITTED,
+ false));
// Prepare the describe cluster response used for the second
describe cluster
env.kafkaClient().prepareResponse(
@@ -2817,7 +2888,8 @@ public class KafkaAdminClientTest {
env.cluster().nodes(),
env.cluster().clusterResource().clusterId(),
3,
- 1 << AclOperation.DESCRIBE.code() | 1 <<
AclOperation.ALTER.code()));
+ 1 << AclOperation.DESCRIBE.code() | 1 <<
AclOperation.ALTER.code(),
+ false));
// Test DescribeCluster with the authorized operations omitted.
final DescribeClusterResult result =
env.adminClient().describeCluster();
@@ -2866,7 +2938,8 @@ public class KafkaAdminClientTest {
Collection<Node> brokers,
String clusterId,
int controllerId,
- int clusterAuthorizedOperations
+ int clusterAuthorizedOperations,
+ boolean sentToController
) {
DescribeClusterResponseData data = new DescribeClusterResponseData()
.setErrorCode(Errors.NONE.code())
@@ -2875,6 +2948,10 @@ public class KafkaAdminClientTest {
.setClusterId(clusterId)
.setClusterAuthorizedOperations(clusterAuthorizedOperations);
+ if (sentToController) {
+ data.setEndpointType(EndpointType.CONTROLLER.id());
+ }
+
brokers.forEach(broker ->
data.brokers().add(new DescribeClusterBroker()
.setHost(broker.host())
@@ -5677,6 +5754,51 @@ public class KafkaAdminClientTest {
}
}
+ @Test
+ public void testIncrementalAlterConfigsToController() throws Exception {
+ try (AdminClientUnitTestEnv env =
mockClientEnv(AdminClientConfig.BOOTSTRAP_CONTROLLERS_CONFIG, "dummy")) {
+ env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+
+ //test NOT_CONTROLLER error scenarios
+ IncrementalAlterConfigsResponseData responseData = new
IncrementalAlterConfigsResponseData();
+ responseData.responses().add(new AlterConfigsResourceResponse()
+ .setResourceName("")
+ .setResourceType(ConfigResource.Type.BROKER.id())
+ .setErrorCode(Errors.NOT_CONTROLLER.code())
+ .setErrorMessage("not controller"));
+
+ env.kafkaClient().prepareResponse(new
IncrementalAlterConfigsResponse(responseData));
+
+ // should retry the describe cluster to update the metadata
+ env.kafkaClient().prepareResponse(
+ prepareDescribeClusterResponse(0,
+ env.cluster().nodes(),
+ env.cluster().clusterResource().clusterId(),
+ 2,
+ MetadataResponse.AUTHORIZED_OPERATIONS_OMITTED,
+ true)
+ );
+
+ IncrementalAlterConfigsResponseData responseData2 = new
IncrementalAlterConfigsResponseData();
+ responseData2.responses().add(new AlterConfigsResourceResponse()
+ .setResourceName("")
+ .setResourceType(ConfigResource.Type.BROKER.id())
+ .setErrorCode(Errors.NONE.code())
+ .setErrorMessage(ApiError.NONE.message()));
+
+ ConfigResource brokerResource = new
ConfigResource(ConfigResource.Type.BROKER, "");
+
+ AlterConfigOp alterConfigOp1 = new AlterConfigOp(
+ new ConfigEntry("log.segment.bytes", "1073741"),
+ AlterConfigOp.OpType.SET);
+
+ final Map<ConfigResource, Collection<AlterConfigOp>> configs = new
HashMap<>();
+ configs.put(brokerResource, singletonList(alterConfigOp1));
+ env.kafkaClient().prepareResponse(new
IncrementalAlterConfigsResponse(responseData2));
+ env.adminClient().incrementalAlterConfigs(configs).all().get();
+ }
+ }
+
@Test
public void testRemoveMembersFromGroupNumRetries() throws Exception {
final Cluster cluster = mockCluster(3, 0);
@@ -8889,7 +9011,7 @@ public class KafkaAdminClientTest {
@ParameterizedTest
@CsvSource({ "false, false", "false, true", "true, false", "true, true" })
public void testAddRaftVoterRequest(boolean fail, boolean sendClusterId)
throws Exception {
- try (AdminClientUnitTestEnv env = mockClientEnv()) {
+ try (AdminClientUnitTestEnv env =
mockClientEnv(AdminClientConfig.BOOTSTRAP_CONTROLLERS_CONFIG, "dummy")) {
AddRaftVoterResponseData responseData = new
AddRaftVoterResponseData();
if (fail) {
responseData.
@@ -8930,13 +9052,52 @@ public class KafkaAdminClientTest {
setName("CONTROLLER").
setHost("example.com").
setPort(8080),
requestData.get().listeners().find("CONTROLLER"));
+
+ // In the fail case, we continue to test the
`NOT_LEADER_OR_FOLLOWER` error case
+ if (fail && !sendClusterId) {
+ responseData.
+ setErrorCode(Errors.NOT_LEADER_OR_FOLLOWER.code()).
+ setErrorMessage("test");
+ env.kafkaClient().prepareResponse(
+ request -> {
+ if (!(request instanceof AddRaftVoterRequest))
return false;
+ requestData.set((AddRaftVoterRequestData)
request.data());
+ return true;
+ },
+ new AddRaftVoterResponse(responseData));
+
+ // should retry the describe cluster to update the metadata
+ env.kafkaClient().prepareResponse(
+ prepareDescribeClusterResponse(0,
+ env.cluster().nodes(),
+ env.cluster().clusterResource().clusterId(),
+ 2,
+ MetadataResponse.AUTHORIZED_OPERATIONS_OMITTED,
+ true)
+ );
+
+ AddRaftVoterResponseData responseData2 = new
AddRaftVoterResponseData();
+ env.kafkaClient().prepareResponse(
+ request -> {
+ if (!(request instanceof AddRaftVoterRequest))
return false;
+ requestData.set((AddRaftVoterRequestData)
request.data());
+ return true;
+ },
+ new AddRaftVoterResponse(responseData2));
+
+ AddRaftVoterResult result2 = env.adminClient().addRaftVoter(1,
+ Uuid.fromString("YAfa4HClT3SIIW2klIUspg"),
+ Collections.singleton(new
RaftVoterEndpoint("CONTROLLER", "example.com", 8080)),
+ options);
+ result2.all().get();
+ }
}
}
@ParameterizedTest
@CsvSource({ "false, false", "false, true", "true, false", "true, true" })
public void testRemoveRaftVoterRequest(boolean fail, boolean
sendClusterId) throws Exception {
- try (AdminClientUnitTestEnv env = mockClientEnv()) {
+ try (AdminClientUnitTestEnv env =
mockClientEnv(AdminClientConfig.BOOTSTRAP_CONTROLLERS_CONFIG, "dummy")) {
RemoveRaftVoterResponseData responseData = new
RemoveRaftVoterResponseData();
if (fail) {
responseData.
@@ -8971,6 +9132,44 @@ public class KafkaAdminClientTest {
}
assertEquals(1, requestData.get().voterId());
assertEquals(Uuid.fromString("YAfa4HClT3SIIW2klIUspg"),
requestData.get().voterDirectoryId());
+
+ // In the fail case, we continue to test the
`NOT_LEADER_OR_FOLLOWER` error case
+ if (fail && !sendClusterId) {
+ responseData.
+ setErrorCode(Errors.NOT_LEADER_OR_FOLLOWER.code()).
+ setErrorMessage("test");
+ env.kafkaClient().prepareResponse(
+ request -> {
+ if (!(request instanceof RemoveRaftVoterRequest))
return false;
+ requestData.set((RemoveRaftVoterRequestData)
request.data());
+ return true;
+ },
+ new RemoveRaftVoterResponse(responseData));
+
+ // should retry the describe cluster to update the metadata
+ env.kafkaClient().prepareResponse(
+ prepareDescribeClusterResponse(0,
+ env.cluster().nodes(),
+ env.cluster().clusterResource().clusterId(),
+ 2,
+ MetadataResponse.AUTHORIZED_OPERATIONS_OMITTED,
+ true)
+ );
+
+ RemoveRaftVoterResponseData responseData2 = new
RemoveRaftVoterResponseData();
+ env.kafkaClient().prepareResponse(
+ request -> {
+ if (!(request instanceof RemoveRaftVoterRequest))
return false;
+ requestData.set((RemoveRaftVoterRequestData)
request.data());
+ return true;
+ },
+ new RemoveRaftVoterResponse(responseData2));
+
+ RemoveRaftVoterResult result2 =
env.adminClient().removeRaftVoter(1,
+ Uuid.fromString("YAfa4HClT3SIIW2klIUspg"),
+ options);
+ result2.all().get();
+ }
}
}
}
diff --git
a/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
b/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
index 7e0b10d1d6a..44bfa9f1734 100644
--- a/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
+++ b/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
@@ -872,9 +872,12 @@ class KRaftClusterTest {
}
}
- def createAdminClient(cluster: KafkaClusterTestKit): Admin = {
+ def createAdminClient(cluster: KafkaClusterTestKit, bootstrapController:
Boolean): Admin = {
var props: Properties = null
- props = cluster.clientProperties()
+ props = if (bootstrapController)
+
cluster.newClientPropertiesBuilder().setUsingBootstrapControllers(true).build()
+ else
+ cluster.clientProperties()
props.put(AdminClientConfig.CLIENT_ID_CONFIG, this.getClass.getName)
Admin.create(props)
}
@@ -892,7 +895,7 @@ class KRaftClusterTest {
TestUtils.waitUntilTrue(() => cluster.brokers.get(i).brokerState ==
BrokerState.RUNNING,
"Broker Never started up")
}
- val admin = createAdminClient(cluster)
+ val admin = createAdminClient(cluster, bootstrapController = false)
try {
val quorumState = admin.describeMetadataQuorum(new
DescribeMetadataQuorumOptions)
val quorumInfo = quorumState.quorumInfo.get()
@@ -936,6 +939,47 @@ class KRaftClusterTest {
}
}
+ @Test
+ def testDescribeQuorumRequestToControllers() : Unit = {
+ val cluster = new KafkaClusterTestKit.Builder(
+ new TestKitNodes.Builder().
+ setNumBrokerNodes(4).
+ setNumControllerNodes(3).build()).build()
+ try {
+ cluster.format()
+ cluster.startup()
+ for (i <- 0 to 3) {
+ TestUtils.waitUntilTrue(() => cluster.brokers.get(i).brokerState ==
BrokerState.RUNNING,
+ "Broker Never started up")
+ }
+ val admin = createAdminClient(cluster, bootstrapController = true)
+ try {
+ val quorumInfo = admin.describeMetadataQuorum(new
DescribeMetadataQuorumOptions).quorumInfo.get()
+
+ assertEquals(cluster.controllers.asScala.keySet,
quorumInfo.voters.asScala.map(_.replicaId).toSet)
+
assertTrue(cluster.controllers.asScala.keySet.contains(quorumInfo.leaderId),
+ s"Leader ID ${quorumInfo.leaderId} was not a controller ID.")
+
+ // Try to bring down the raft client in the active controller node to
force the leader election.
+
cluster.controllers().get(quorumInfo.leaderId).sharedServer.raftManager.client.shutdown(1000)
+ // Send another describe metadata quorum request, it'll get
NOT_LEADER_OR_FOLLOWER error first and then re-retrieve the metadata update
+ // and send to the correct active controller.
+ val quorumInfo2 = admin.describeMetadataQuorum(new
DescribeMetadataQuorumOptions)
+ .quorumInfo().get()
+ // Make sure the leader has changed
+ assertTrue(quorumInfo.leaderId() != quorumInfo2.leaderId())
+
+ assertEquals(cluster.controllers.asScala.keySet,
quorumInfo.voters.asScala.map(_.replicaId).toSet)
+
assertTrue(cluster.controllers.asScala.keySet.contains(quorumInfo.leaderId),
+ s"Leader ID ${quorumInfo.leaderId} was not a controller ID.")
+ } finally {
+ admin.close()
+ }
+ } finally {
+ cluster.close()
+ }
+ }
+
@Test
def testUpdateMetadataVersion(): Unit = {
val cluster = new KafkaClusterTestKit.Builder(