This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new e0c77140b2c KAFKA-17039 KIP-919 supports for unregisterBroker (#19063)
e0c77140b2c is described below

commit e0c77140b2cce4bcc4d9d983536d6cfa3df070ba
Author: TengYao Chi <[email protected]>
AuthorDate: Sat Mar 1 23:55:35 2025 +0800

    KAFKA-17039 KIP-919 supports for unregisterBroker (#19063)
    
    Reviewers: Chia-Ping Tsai <[email protected]>
---
 .../java/org/apache/kafka/clients/admin/KafkaAdminClient.java |  2 +-
 .../scala/integration/kafka/server/KRaftClusterTest.scala     |  7 ++++---
 .../src/test/java/org/apache/kafka/tools/ClusterToolTest.java | 11 +----------
 3 files changed, 6 insertions(+), 14 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java 
b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index 7737625078d..725e48c3656 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -4729,7 +4729,7 @@ public class KafkaAdminClient extends AdminClient {
         final KafkaFutureImpl<Void> future = new KafkaFutureImpl<>();
         final long now = time.milliseconds();
         final Call call = new Call("unregisterBroker", calcDeadlineMs(now, 
options.timeoutMs()),
-                new LeastLoadedNodeProvider()) {
+                new LeastLoadedBrokerOrActiveKController()) {
 
             @Override
             UnregisterBrokerRequest.Builder createRequest(int timeoutMs) {
diff --git 
a/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala 
b/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
index 06cb2e2eebb..b4515f7919d 100644
--- a/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
+++ b/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
@@ -833,8 +833,9 @@ class KRaftClusterTest {
     Option(image.brokers().get(brokerId)).isEmpty
   }
 
-  @Test
-  def testUnregisterBroker(): Unit = {
+  @ParameterizedTest
+  @ValueSource(booleans = Array(true, false))
+  def testUnregisterBroker(usingBootstrapController: Boolean): Unit = {
     val cluster = new KafkaClusterTestKit.Builder(
       new TestKitNodes.Builder().
         setNumBrokerNodes(4).
@@ -848,7 +849,7 @@ class KRaftClusterTest {
       cluster.brokers().get(0).shutdown()
       TestUtils.waitUntilTrue(() => !brokerIsUnfenced(clusterImage(cluster, 
1), 0),
         "Timed out waiting for broker 0 to be fenced.")
-      val admin = Admin.create(cluster.clientProperties())
+      val admin = createAdminClient(cluster, bootstrapController = 
usingBootstrapController);
       try {
         admin.unregisterBroker(0)
       } finally {
diff --git a/tools/src/test/java/org/apache/kafka/tools/ClusterToolTest.java 
b/tools/src/test/java/org/apache/kafka/tools/ClusterToolTest.java
index 8e7a77c2925..6a2fcc5150e 100644
--- a/tools/src/test/java/org/apache/kafka/tools/ClusterToolTest.java
+++ b/tools/src/test/java/org/apache/kafka/tools/ClusterToolTest.java
@@ -18,7 +18,6 @@ package org.apache.kafka.tools;
 
 import org.apache.kafka.clients.admin.Admin;
 import org.apache.kafka.clients.admin.MockAdminClient;
-import org.apache.kafka.common.errors.UnsupportedEndpointTypeException;
 import org.apache.kafka.common.test.ClusterInstance;
 import org.apache.kafka.common.test.api.ClusterTest;
 import org.apache.kafka.common.test.api.Type;
@@ -30,12 +29,10 @@ import java.io.PrintStream;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Set;
-import java.util.concurrent.ExecutionException;
 import java.util.stream.Collectors;
 
 import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
 import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
@@ -102,13 +99,7 @@ public class ClusterToolTest {
         brokerIds.removeAll(clusterInstance.controllerIds());
         int brokerId = assertDoesNotThrow(() -> 
brokerIds.stream().findFirst().get());
         clusterInstance.shutdownBroker(brokerId);
-        ExecutionException exception =
-                assertThrows(ExecutionException.class,
-                        () -> ClusterTool.execute("unregister", 
"--bootstrap-controller", clusterInstance.bootstrapControllers(), "--id", 
String.valueOf(brokerId)));
-        assertNotNull(exception.getCause());
-        assertEquals(UnsupportedEndpointTypeException.class, 
exception.getCause().getClass());
-        assertEquals("This Admin API is not yet supported when communicating 
directly with " +
-                "the controller quorum.", exception.getCause().getMessage());
+        assertDoesNotThrow(() -> ClusterTool.execute("unregister", 
"--bootstrap-controller", clusterInstance.bootstrapControllers(), "--id", 
String.valueOf(brokerId)));
     }
 
     @ClusterTest(brokers = 3, types = {Type.KRAFT, Type.CO_KRAFT})

Reply via email to