This is an automated email from the ASF dual-hosted git repository.

mimaison pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 612e1299e46 KAFKA-18230: Handle not controller or not leader error in 
admin client (#18165)
612e1299e46 is described below

commit 612e1299e46ee72bcf763257e0d99dd9a2c48bf6
Author: Luke Chen <[email protected]>
AuthorDate: Wed Feb 5 00:51:24 2025 +0900

    KAFKA-18230: Handle not controller or not leader error in admin client 
(#18165)
    
    
    Reviewers: Mickael Maison <[email protected]>, Chia-Ping Tsai 
<[email protected]>
---
 .../kafka/clients/admin/KafkaAdminClient.java      |   9 +
 .../clients/admin/AdminClientUnitTestEnv.java      |   3 +-
 .../kafka/clients/admin/KafkaAdminClientTest.java  | 219 ++++++++++++++++++++-
 .../kafka/server/KRaftClusterTest.scala            |  50 ++++-
 4 files changed, 267 insertions(+), 14 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java 
b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index 3f88cd5aa80..ed693b497e2 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -2656,6 +2656,7 @@ public class KafkaAdminClient extends AdminClient {
 
             @Override
             void handleResponse(AbstractResponse abstractResponse) {
+                handleNotControllerError(abstractResponse);
                 CreateAclsResponse response = (CreateAclsResponse) 
abstractResponse;
                 List<AclCreationResult> responses = response.results();
                 Iterator<AclCreationResult> iter = responses.iterator();
@@ -2708,6 +2709,7 @@ public class KafkaAdminClient extends AdminClient {
 
             @Override
             void handleResponse(AbstractResponse abstractResponse) {
+                handleNotControllerError(abstractResponse);
                 DeleteAclsResponse response = (DeleteAclsResponse) 
abstractResponse;
                 List<DeleteAclsResponseData.DeleteAclsFilterResult> results = 
response.filterResults();
                 Iterator<DeleteAclsResponseData.DeleteAclsFilterResult> iter = 
results.iterator();
@@ -2926,6 +2928,7 @@ public class KafkaAdminClient extends AdminClient {
 
             @Override
             public void handleResponse(AbstractResponse abstractResponse) {
+                handleNotControllerError(abstractResponse);
                 IncrementalAlterConfigsResponse response = 
(IncrementalAlterConfigsResponse) abstractResponse;
                 Map<ConfigResource, ApiError> errors = 
IncrementalAlterConfigsResponse.fromResponseData(response.data());
                 for (Map.Entry<ConfigResource, KafkaFutureImpl<Void>> entry : 
futures.entrySet()) {
@@ -4089,8 +4092,11 @@ public class KafkaAdminClient extends AdminClient {
     }
 
     private void handleNotControllerError(AbstractResponse response) throws 
ApiException {
+        // When sending requests directly to the follower controller, it might 
return NOT_LEADER_OR_FOLLOWER error.
         if (response.errorCounts().containsKey(Errors.NOT_CONTROLLER)) {
             handleNotControllerError(Errors.NOT_CONTROLLER);
+        } else if (metadataManager.usingBootstrapControllers() && 
response.errorCounts().containsKey(Errors.NOT_LEADER_OR_FOLLOWER)) {
+            handleNotControllerError(Errors.NOT_LEADER_OR_FOLLOWER);
         }
     }
 
@@ -4652,6 +4658,7 @@ public class KafkaAdminClient extends AdminClient {
 
             @Override
             void handleResponse(AbstractResponse response) {
+                handleNotControllerError(response);
                 final DescribeQuorumResponse quorumResponse = 
(DescribeQuorumResponse) response;
                 if (quorumResponse.data().errorCode() != Errors.NONE.code()) {
                     throw 
Errors.forCode(quorumResponse.data().errorCode()).exception(quorumResponse.data().errorMessage());
@@ -4849,6 +4856,7 @@ public class KafkaAdminClient extends AdminClient {
 
             @Override
             void handleResponse(AbstractResponse response) {
+                handleNotControllerError(response);
                 AddRaftVoterResponse addResponse = (AddRaftVoterResponse) 
response;
                 if (addResponse.data().errorCode() != Errors.NONE.code()) {
                     ApiError error = new ApiError(
@@ -4893,6 +4901,7 @@ public class KafkaAdminClient extends AdminClient {
 
             @Override
             void handleResponse(AbstractResponse response) {
+                handleNotControllerError(response);
                 RemoveRaftVoterResponse addResponse = 
(RemoveRaftVoterResponse) response;
                 if (addResponse.data().errorCode() != Errors.NONE.code()) {
                     ApiError error = new ApiError(
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientUnitTestEnv.java
 
b/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientUnitTestEnv.java
index 99c823e78f4..add2256b2b3 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientUnitTestEnv.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientUnitTestEnv.java
@@ -73,7 +73,8 @@ public class AdminClientUnitTestEnv implements AutoCloseable {
 
         AdminMetadataManager metadataManager = new AdminMetadataManager(new 
LogContext(),
                 
adminClientConfig.getLong(AdminClientConfig.RETRY_BACKOFF_MS_CONFIG),
-                
adminClientConfig.getLong(AdminClientConfig.METADATA_MAX_AGE_CONFIG), false);
+                
adminClientConfig.getLong(AdminClientConfig.METADATA_MAX_AGE_CONFIG),
+                
config.containsKey(AdminClientConfig.BOOTSTRAP_CONTROLLERS_CONFIG));
         this.mockClient = new MockClient(time, new 
MockClient.MockMetadataUpdater() {
             @Override
             public List<Node> fetchNodes() {
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index 07b4b55e268..2f0bdaa7a98 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -1509,7 +1509,8 @@ public class KafkaAdminClientTest {
                     env.cluster().nodes(),
                     env.cluster().clusterResource().clusterId(),
                     2,
-                    MetadataResponse.AUTHORIZED_OPERATIONS_OMITTED)
+                    MetadataResponse.AUTHORIZED_OPERATIONS_OMITTED,
+                    false)
             );
 
             DescribeTopicPartitionsResponseData dataFirstPart = new 
DescribeTopicPartitionsResponseData();
@@ -1566,7 +1567,8 @@ public class KafkaAdminClientTest {
                             env.cluster().nodes(),
                             env.cluster().clusterResource().clusterId(),
                             2,
-                            authorisedOperations)
+                            authorisedOperations,
+                            false)
             );
 
             DescribeTopicPartitionsResponseData responseData = new 
DescribeTopicPartitionsResponseData();
@@ -1602,7 +1604,8 @@ public class KafkaAdminClientTest {
                             env.cluster().nodes(),
                             env.cluster().clusterResource().clusterId(),
                             2,
-                            authorisedOperations)
+                            authorisedOperations,
+                            false)
             );
 
             DescribeTopicPartitionsResponseData responseData = new 
DescribeTopicPartitionsResponseData();
@@ -1642,7 +1645,8 @@ public class KafkaAdminClientTest {
                     env.cluster().nodes(),
                     env.cluster().clusterResource().clusterId(),
                     2,
-                    MetadataResponse.AUTHORIZED_OPERATIONS_OMITTED)
+                    MetadataResponse.AUTHORIZED_OPERATIONS_OMITTED,
+                    false)
             );
 
             DescribeTopicPartitionsResponseData dataFirstPart = new 
DescribeTopicPartitionsResponseData();
@@ -1742,7 +1746,8 @@ public class KafkaAdminClientTest {
                     env.cluster().nodes(),
                     env.cluster().clusterResource().clusterId(),
                     2,
-                    MetadataResponse.AUTHORIZED_OPERATIONS_OMITTED)
+                    MetadataResponse.AUTHORIZED_OPERATIONS_OMITTED,
+                    false)
             );
 
             DescribeTopicPartitionsResponseData dataFirstPart = new 
DescribeTopicPartitionsResponseData();
@@ -1886,6 +1891,71 @@ public class KafkaAdminClientTest {
         }
     }
 
+    @Test
+    public void testCreateAclsToController() throws Exception {
+        try (AdminClientUnitTestEnv env = 
mockClientEnv(AdminClientConfig.BOOTSTRAP_CONTROLLERS_CONFIG, "dummy")) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+
+
+            env.kafkaClient().prepareResponse(new CreateAclsResponse(new 
CreateAclsResponseData().setResults(asList(
+                    new CreateAclsResponseData.AclCreationResult()
+                            .setErrorCode(Errors.NOT_CONTROLLER.code())
+                            .setErrorMessage("not controller")))));
+            // should retry the describe cluster to update the metadata
+            env.kafkaClient().prepareResponse(
+                    prepareDescribeClusterResponse(0,
+                            env.cluster().nodes(),
+                            env.cluster().clusterResource().clusterId(),
+                            2,
+                            MetadataResponse.AUTHORIZED_OPERATIONS_OMITTED,
+                            true)
+            );
+
+            // Test a call where we successfully create two ACLs.
+            env.kafkaClient().prepareResponse(new CreateAclsResponse(new 
CreateAclsResponseData().setResults(asList(
+                    new CreateAclsResponseData.AclCreationResult()))));
+
+            CreateAclsResult results = 
env.adminClient().createAcls(asList(ACL1));
+            assertCollectionIs(results.values().keySet(), ACL1);
+            for (KafkaFuture<Void> future : results.values().values())
+                future.get();
+            results.all().get();
+        }
+    }
+
+    @Test
+    public void testDeleteAclsToController() throws Exception {
+        try (AdminClientUnitTestEnv env = 
mockClientEnv(AdminClientConfig.BOOTSTRAP_CONTROLLERS_CONFIG, "dummy")) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+
+            env.kafkaClient().prepareResponse(new DeleteAclsResponse(new 
DeleteAclsResponseData()
+                    .setThrottleTimeMs(0)
+                    .setFilterResults(asList(new 
DeleteAclsResponseData.DeleteAclsFilterResult()
+                                    .setErrorCode(Errors.NOT_CONTROLLER.code())
+                                    .setErrorMessage("not controller"))),
+                    ApiKeys.DELETE_ACLS.latestVersion()));
+            // should retry the describe cluster to update the metadata
+            env.kafkaClient().prepareResponse(
+                    prepareDescribeClusterResponse(0,
+                            env.cluster().nodes(),
+                            env.cluster().clusterResource().clusterId(),
+                            2,
+                            MetadataResponse.AUTHORIZED_OPERATIONS_OMITTED,
+                            true)
+            );
+            // Test a call where there are no errors.
+            env.kafkaClient().prepareResponse(new DeleteAclsResponse(new 
DeleteAclsResponseData()
+                    .setThrottleTimeMs(0)
+                    .setFilterResults(asList(
+                            new DeleteAclsResponseData.DeleteAclsFilterResult()
+                                    
.setMatchingAcls(singletonList(DeleteAclsResponse.matchingAcl(ACL1, 
ApiError.NONE))))),
+                    ApiKeys.DELETE_ACLS.latestVersion()));
+            DeleteAclsResult results = 
env.adminClient().deleteAcls(asList(FILTER1));
+            Collection<AclBinding> deleted = results.all().get();
+            assertCollectionIs(deleted, ACL1);
+        }
+    }
+
     @Test
     public void testCreateAcls() throws Exception {
         try (AdminClientUnitTestEnv env = mockClientEnv()) {
@@ -2809,7 +2879,8 @@ public class KafkaAdminClientTest {
                     env.cluster().nodes(),
                     env.cluster().clusterResource().clusterId(),
                     2,
-                    MetadataResponse.AUTHORIZED_OPERATIONS_OMITTED));
+                    MetadataResponse.AUTHORIZED_OPERATIONS_OMITTED,
+                    false));
 
             // Prepare the describe cluster response used for the second 
describe cluster
             env.kafkaClient().prepareResponse(
@@ -2817,7 +2888,8 @@ public class KafkaAdminClientTest {
                     env.cluster().nodes(),
                     env.cluster().clusterResource().clusterId(),
                     3,
-                    1 << AclOperation.DESCRIBE.code() | 1 << 
AclOperation.ALTER.code()));
+                    1 << AclOperation.DESCRIBE.code() | 1 << 
AclOperation.ALTER.code(),
+                    false));
 
             // Test DescribeCluster with the authorized operations omitted.
             final DescribeClusterResult result = 
env.adminClient().describeCluster();
@@ -2866,7 +2938,8 @@ public class KafkaAdminClientTest {
         Collection<Node> brokers,
         String clusterId,
         int controllerId,
-        int clusterAuthorizedOperations
+        int clusterAuthorizedOperations,
+        boolean sentToController
     ) {
         DescribeClusterResponseData data = new DescribeClusterResponseData()
             .setErrorCode(Errors.NONE.code())
@@ -2875,6 +2948,10 @@ public class KafkaAdminClientTest {
             .setClusterId(clusterId)
             .setClusterAuthorizedOperations(clusterAuthorizedOperations);
 
+        if (sentToController) {
+            data.setEndpointType(EndpointType.CONTROLLER.id());
+        }
+
         brokers.forEach(broker ->
             data.brokers().add(new DescribeClusterBroker()
                 .setHost(broker.host())
@@ -5677,6 +5754,51 @@ public class KafkaAdminClientTest {
         }
     }
 
+    @Test
+    public void testIncrementalAlterConfigsToController()  throws Exception {
+        try (AdminClientUnitTestEnv env = 
mockClientEnv(AdminClientConfig.BOOTSTRAP_CONTROLLERS_CONFIG, "dummy")) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+
+            //test NOT_CONTROLLER error scenarios
+            IncrementalAlterConfigsResponseData responseData =  new 
IncrementalAlterConfigsResponseData();
+            responseData.responses().add(new AlterConfigsResourceResponse()
+                    .setResourceName("")
+                    .setResourceType(ConfigResource.Type.BROKER.id())
+                    .setErrorCode(Errors.NOT_CONTROLLER.code())
+                    .setErrorMessage("not controller"));
+
+            env.kafkaClient().prepareResponse(new 
IncrementalAlterConfigsResponse(responseData));
+
+            // should retry the describe cluster to update the metadata
+            env.kafkaClient().prepareResponse(
+                    prepareDescribeClusterResponse(0,
+                            env.cluster().nodes(),
+                            env.cluster().clusterResource().clusterId(),
+                            2,
+                            MetadataResponse.AUTHORIZED_OPERATIONS_OMITTED,
+                            true)
+            );
+
+            IncrementalAlterConfigsResponseData responseData2 =  new 
IncrementalAlterConfigsResponseData();
+            responseData2.responses().add(new AlterConfigsResourceResponse()
+                    .setResourceName("")
+                    .setResourceType(ConfigResource.Type.BROKER.id())
+                    .setErrorCode(Errors.NONE.code())
+                    .setErrorMessage(ApiError.NONE.message()));
+
+            ConfigResource brokerResource = new 
ConfigResource(ConfigResource.Type.BROKER, "");
+
+            AlterConfigOp alterConfigOp1 = new AlterConfigOp(
+                    new ConfigEntry("log.segment.bytes", "1073741"),
+                    AlterConfigOp.OpType.SET);
+
+            final Map<ConfigResource, Collection<AlterConfigOp>> configs = new 
HashMap<>();
+            configs.put(brokerResource, singletonList(alterConfigOp1));
+            env.kafkaClient().prepareResponse(new 
IncrementalAlterConfigsResponse(responseData2));
+            env.adminClient().incrementalAlterConfigs(configs).all().get();
+        }
+    }
+
     @Test
     public void testRemoveMembersFromGroupNumRetries() throws Exception {
         final Cluster cluster = mockCluster(3, 0);
@@ -8889,7 +9011,7 @@ public class KafkaAdminClientTest {
     @ParameterizedTest
     @CsvSource({ "false, false", "false, true", "true, false", "true, true" })
     public void testAddRaftVoterRequest(boolean fail, boolean sendClusterId) 
throws Exception {
-        try (AdminClientUnitTestEnv env = mockClientEnv()) {
+        try (AdminClientUnitTestEnv env = 
mockClientEnv(AdminClientConfig.BOOTSTRAP_CONTROLLERS_CONFIG, "dummy")) {
             AddRaftVoterResponseData responseData = new 
AddRaftVoterResponseData();
             if (fail) {
                 responseData.
@@ -8930,13 +9052,52 @@ public class KafkaAdminClientTest {
                     setName("CONTROLLER").
                     setHost("example.com").
                     setPort(8080), 
requestData.get().listeners().find("CONTROLLER"));
+
+            // In the fail case, we continue to test the 
`NOT_LEADER_OR_FOLLOWER` error case
+            if (fail && !sendClusterId) {
+                responseData.
+                        setErrorCode(Errors.NOT_LEADER_OR_FOLLOWER.code()).
+                        setErrorMessage("test");
+                env.kafkaClient().prepareResponse(
+                        request -> {
+                            if (!(request instanceof AddRaftVoterRequest)) 
return false;
+                            requestData.set((AddRaftVoterRequestData) 
request.data());
+                            return true;
+                        },
+                        new AddRaftVoterResponse(responseData));
+
+                // should retry the describe cluster to update the metadata
+                env.kafkaClient().prepareResponse(
+                        prepareDescribeClusterResponse(0,
+                                env.cluster().nodes(),
+                                env.cluster().clusterResource().clusterId(),
+                                2,
+                                MetadataResponse.AUTHORIZED_OPERATIONS_OMITTED,
+                                true)
+                );
+
+                AddRaftVoterResponseData responseData2 = new 
AddRaftVoterResponseData();
+                env.kafkaClient().prepareResponse(
+                        request -> {
+                            if (!(request instanceof AddRaftVoterRequest)) 
return false;
+                            requestData.set((AddRaftVoterRequestData) 
request.data());
+                            return true;
+                        },
+                        new AddRaftVoterResponse(responseData2));
+
+                AddRaftVoterResult result2 = env.adminClient().addRaftVoter(1,
+                        Uuid.fromString("YAfa4HClT3SIIW2klIUspg"),
+                        Collections.singleton(new 
RaftVoterEndpoint("CONTROLLER", "example.com", 8080)),
+                        options);
+                result2.all().get();
+            }
         }
     }
 
     @ParameterizedTest
     @CsvSource({ "false, false", "false, true", "true, false", "true, true" })
     public void testRemoveRaftVoterRequest(boolean fail, boolean 
sendClusterId) throws Exception {
-        try (AdminClientUnitTestEnv env = mockClientEnv()) {
+        try (AdminClientUnitTestEnv env = 
mockClientEnv(AdminClientConfig.BOOTSTRAP_CONTROLLERS_CONFIG, "dummy")) {
             RemoveRaftVoterResponseData responseData = new 
RemoveRaftVoterResponseData();
             if (fail) {
                 responseData.
@@ -8971,6 +9132,44 @@ public class KafkaAdminClientTest {
             }
             assertEquals(1, requestData.get().voterId());
             assertEquals(Uuid.fromString("YAfa4HClT3SIIW2klIUspg"), 
requestData.get().voterDirectoryId());
+
+            // In the fail case, we continue to test the 
`NOT_LEADER_OR_FOLLOWER` error case
+            if (fail && !sendClusterId) {
+                responseData.
+                        setErrorCode(Errors.NOT_LEADER_OR_FOLLOWER.code()).
+                        setErrorMessage("test");
+                env.kafkaClient().prepareResponse(
+                        request -> {
+                            if (!(request instanceof RemoveRaftVoterRequest)) 
return false;
+                            requestData.set((RemoveRaftVoterRequestData) 
request.data());
+                            return true;
+                        },
+                        new RemoveRaftVoterResponse(responseData));
+
+                // should retry the describe cluster to update the metadata
+                env.kafkaClient().prepareResponse(
+                        prepareDescribeClusterResponse(0,
+                                env.cluster().nodes(),
+                                env.cluster().clusterResource().clusterId(),
+                                2,
+                                MetadataResponse.AUTHORIZED_OPERATIONS_OMITTED,
+                                true)
+                );
+
+                RemoveRaftVoterResponseData responseData2 = new 
RemoveRaftVoterResponseData();
+                env.kafkaClient().prepareResponse(
+                        request -> {
+                            if (!(request instanceof RemoveRaftVoterRequest)) 
return false;
+                            requestData.set((RemoveRaftVoterRequestData) 
request.data());
+                            return true;
+                        },
+                        new RemoveRaftVoterResponse(responseData2));
+
+                RemoveRaftVoterResult result2 = 
env.adminClient().removeRaftVoter(1,
+                        Uuid.fromString("YAfa4HClT3SIIW2klIUspg"),
+                        options);
+                result2.all().get();
+            }
         }
     }
 }
diff --git 
a/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala 
b/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
index 7e0b10d1d6a..44bfa9f1734 100644
--- a/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
+++ b/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
@@ -872,9 +872,12 @@ class KRaftClusterTest {
     }
   }
 
-  def createAdminClient(cluster: KafkaClusterTestKit): Admin = {
+  def createAdminClient(cluster: KafkaClusterTestKit, bootstrapController: 
Boolean): Admin = {
     var props: Properties = null
-    props = cluster.clientProperties()
+    props = if (bootstrapController)
+      
cluster.newClientPropertiesBuilder().setUsingBootstrapControllers(true).build()
+    else
+      cluster.clientProperties()
     props.put(AdminClientConfig.CLIENT_ID_CONFIG, this.getClass.getName)
     Admin.create(props)
   }
@@ -892,7 +895,7 @@ class KRaftClusterTest {
         TestUtils.waitUntilTrue(() => cluster.brokers.get(i).brokerState == 
BrokerState.RUNNING,
           "Broker Never started up")
       }
-      val admin = createAdminClient(cluster)
+      val admin = createAdminClient(cluster, bootstrapController = false)
       try {
         val quorumState = admin.describeMetadataQuorum(new 
DescribeMetadataQuorumOptions)
         val quorumInfo = quorumState.quorumInfo.get()
@@ -936,6 +939,47 @@ class KRaftClusterTest {
     }
   }
 
+  @Test
+  def testDescribeQuorumRequestToControllers() : Unit = {
+    val cluster = new KafkaClusterTestKit.Builder(
+      new TestKitNodes.Builder().
+        setNumBrokerNodes(4).
+        setNumControllerNodes(3).build()).build()
+    try {
+      cluster.format()
+      cluster.startup()
+      for (i <- 0 to 3) {
+        TestUtils.waitUntilTrue(() => cluster.brokers.get(i).brokerState == 
BrokerState.RUNNING,
+          "Broker Never started up")
+      }
+      val admin = createAdminClient(cluster, bootstrapController = true)
+      try {
+        val quorumInfo = admin.describeMetadataQuorum(new 
DescribeMetadataQuorumOptions).quorumInfo.get()
+
+        assertEquals(cluster.controllers.asScala.keySet, 
quorumInfo.voters.asScala.map(_.replicaId).toSet)
+        
assertTrue(cluster.controllers.asScala.keySet.contains(quorumInfo.leaderId),
+          s"Leader ID ${quorumInfo.leaderId} was not a controller ID.")
+
+        // Try to bring down the raft client in the active controller node to 
force the leader election.
+        
cluster.controllers().get(quorumInfo.leaderId).sharedServer.raftManager.client.shutdown(1000)
+        // Send another describe metadata quorum request, it'll get 
NOT_LEADER_OR_FOLLOWER error first and then re-retrieve the metadata update
+        // and send to the correct active controller.
+        val quorumInfo2 = admin.describeMetadataQuorum(new 
DescribeMetadataQuorumOptions)
+          .quorumInfo().get()
+        // Make sure the leader has changed
+        assertTrue(quorumInfo.leaderId() != quorumInfo2.leaderId())
+
+        assertEquals(cluster.controllers.asScala.keySet, 
quorumInfo.voters.asScala.map(_.replicaId).toSet)
+        
assertTrue(cluster.controllers.asScala.keySet.contains(quorumInfo.leaderId),
+          s"Leader ID ${quorumInfo.leaderId} was not a controller ID.")
+      } finally {
+        admin.close()
+      }
+    } finally {
+      cluster.close()
+    }
+  }
+
   @Test
   def testUpdateMetadataVersion(): Unit = {
     val cluster = new KafkaClusterTestKit.Builder(

Reply via email to