This is an automated email from the ASF dual-hosted git repository.

showuon pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 3c9031c6245 KAFKA-15507: Make AdminClient throw non-retriable 
exception for a new call while closing (#14455)
3c9031c6245 is described below

commit 3c9031c62455e4eaa3f5d16a3bba94d7e3159fb6
Author: Gantigmaa Selenge <[email protected]>
AuthorDate: Wed Oct 11 04:41:46 2023 +0100

    KAFKA-15507: Make AdminClient throw non-retriable exception for a new call 
while closing (#14455)
    
    AdminClient will throw IllegalStateException instead of TimeoutException if 
it receives new calls while closing down. This is more consistent with how 
Consumer and Producer clients handle new calls after closed down.
    
    Reviewers: Luke Chen <[email protected]>, Kirk True <[email protected]>, 
Kamal Chandraprakash <[email protected]>, vamossagar12 
<[email protected]>
---
 .../org/apache/kafka/clients/admin/KafkaAdminClient.java     |  5 ++---
 .../org/apache/kafka/clients/admin/KafkaAdminClientTest.java | 12 ++++++++++++
 .../kafka/api/PlaintextAdminIntegrationTest.scala            |  4 ++--
 3 files changed, 16 insertions(+), 5 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java 
b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index 4db6b271946..27d28b5b336 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -1546,9 +1546,8 @@ public class KafkaAdminClient extends AdminClient {
          */
         void call(Call call, long now) {
             if (hardShutdownTimeMs.get() != INVALID_SHUTDOWN_TIME) {
-                log.debug("The AdminClient is not accepting new calls. Timing 
out {}.", call);
-                call.handleTimeoutFailure(time.milliseconds(),
-                    new TimeoutException("The AdminClient thread is not 
accepting new calls."));
+                log.debug("Cannot accept new call {} when AdminClient is 
closing.", call);
+                call.handleFailure(new IllegalStateException("Cannot accept 
new calls when AdminClient is closing."));
             } else if (metadataManager.usingBootstrapControllers() &&
                     (!call.nodeProvider.supportsUseControllers())) {
                 call.fail(now, new UnsupportedEndpointTypeException("This 
Admin API is not " +
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index 229d3119871..378e08b2c45 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -478,6 +478,18 @@ public class KafkaAdminClientTest {
         callbackCalled.acquire();
     }
 
+    @Test
+    public void testAdminClientFailureWhenClosed() {
+        MockTime time = new MockTime();
+        AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(time, 
mockCluster(3, 0));
+        env.adminClient().close();
+        ExecutionException e = assertThrows(ExecutionException.class, () -> 
env.adminClient().createTopics(
+                singleton(new NewTopic("myTopic", Collections.singletonMap(0, 
asList(0, 1, 2)))),
+                new CreateTopicsOptions().timeoutMs(10000)).all().get());
+        assertTrue(e.getCause() instanceof IllegalStateException,
+                "Expected an IllegalStateException error, but got " + 
Utils.stackTrace(e));
+    }
+
     private static OffsetDeleteResponse prepareOffsetDeleteResponse(Errors 
error) {
         return new OffsetDeleteResponse(
             new OffsetDeleteResponseData()
diff --git 
a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala 
b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
index 784374d23e8..5bb3533146c 100644
--- 
a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
+++ 
b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
@@ -1026,7 +1026,7 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
 
   /**
     * Test closing the AdminClient with a generous timeout.  Calls in progress 
should be completed,
-    * since they can be done within the timeout.  New calls should receive 
timeouts.
+    * since they can be done within the timeout.  New calls should receive 
exceptions.
     */
   @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
   @ValueSource(strings = Array("zk", "kraft"))
@@ -1037,7 +1037,7 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
     val future = client.createTopics(newTopics.asJava, new 
CreateTopicsOptions().validateOnly(true)).all()
     client.close(time.Duration.ofHours(2))
     val future2 = client.createTopics(newTopics.asJava, new 
CreateTopicsOptions().validateOnly(true)).all()
-    assertFutureExceptionTypeEquals(future2, classOf[TimeoutException])
+    assertFutureExceptionTypeEquals(future2, classOf[IllegalStateException])
     future.get
     client.close(time.Duration.ofMinutes(30)) // multiple close-with-timeout 
should have no effect
   }

Reply via email to