This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 25d09060299 MINOR: Move testCallInFlightTimeouts to Java and remove
internal factory hack (#22220)
25d09060299 is described below
commit 25d09060299819f671daa0a3220d3d711868f238
Author: Jiayao Sun <[email protected]>
AuthorDate: Fri May 8 17:03:38 2026 +1200
MINOR: Move testCallInFlightTimeouts to Java and remove internal factory
hack (#22220)
Moves `testCallInFlightTimeouts` from the Scala core module to a new
Java `AdminClientTimeoutIntegrationTest` in `clients-integration-tests`.
By placing the test in the `org.apache.kafka.clients.admin` package, it
can directly access `KafkaAdminClient.createInternal()`. This allows us
to:
1. Delete the `KafkaAdminClientInternalFactory` backdoor, restoring
proper encapsulation.
2. Inline `FailureInjectingTimeoutProcessorFactory` into the test class,
reducing test fixture clutter.
Reviewers: Chia-Ping Tsai <[email protected]>, Igor Soarez
<[email protected]>
---
.../admin/AdminClientTimeoutIntegrationTest.java | 121 +++++++++++++++++++++
.../FailureInjectingTimeoutProcessorFactory.java | 64 -----------
.../admin/KafkaAdminClientInternalFactory.java | 23 ----
.../kafka/api/PlaintextAdminIntegrationTest.scala | 19 +---
4 files changed, 122 insertions(+), 105 deletions(-)
diff --git
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/admin/AdminClientTimeoutIntegrationTest.java
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/admin/AdminClientTimeoutIntegrationTest.java
new file mode 100644
index 00000000000..9503b887c9e
--- /dev/null
+++
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/admin/AdminClientTimeoutIntegrationTest.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.test.ClusterInstance;
+import org.apache.kafka.common.test.api.ClusterTest;
+import org.apache.kafka.common.test.api.ClusterTestDefaults;
+import org.apache.kafka.common.test.api.Type;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Stream;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+@ClusterTestDefaults(types = { Type.KRAFT })
+public class AdminClientTimeoutIntegrationTest {
+
+ private static final Logger log =
LoggerFactory.getLogger(AdminClientTimeoutIntegrationTest.class);
+
+ private final ClusterInstance clusterInstance;
+
+ public AdminClientTimeoutIntegrationTest(ClusterInstance clusterInstance) {
+ this.clusterInstance = clusterInstance;
+ }
+
+ /**
+ * Test injecting timeouts for calls that are in flight.
+ */
+ @ClusterTest
+ public void testCallInFlightTimeouts() throws Exception {
+ var config = Map.of(
+ AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,
clusterInstance.bootstrapServers(),
+ AdminClientConfig.RETRIES_CONFIG, "0",
+ // Set an extremely large overall API timeout to ensure the
+ // FailureInjectingTimeoutProcessor triggers before the
API-level timeout does.
+ AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, "100000000"
+ );
+
+ var factory = new FailureInjectingTimeoutProcessorFactory();
+ try (var client = KafkaAdminClient.createInternal(new
AdminClientConfig(config), factory)) {
+
+ var future = client.createTopics(Stream.of("mytopic1", "mytopic2")
+ .map(t -> new NewTopic(t, 1, (short) 1)).toList(),
+ new CreateTopicsOptions().validateOnly(true)).all();
+
+ var e = assertThrows(ExecutionException.class, future::get);
+ assertInstanceOf(TimeoutException.class, e.getCause());
+
+ var future2 = client.createTopics(Stream.of("mytopic3", "mytopic4")
+ .map(t -> new NewTopic(t, 1, (short) 1)).toList(),
+ new CreateTopicsOptions().validateOnly(true)).all();
+ future2.get();
+ assertEquals(1, factory.failuresInjected());
+ }
+ }
+
+
+ static class FailureInjectingTimeoutProcessorFactory extends
KafkaAdminClient.TimeoutProcessorFactory {
+
+ private int numTries = 0;
+
+ private int failuresInjected = 0;
+
+ @Override
+ KafkaAdminClient.TimeoutProcessor create(long now) {
+ return new FailureInjectingTimeoutProcessor(now);
+ }
+
+ synchronized boolean shouldInjectFailure() {
+ numTries++;
+ if (numTries == 1) {
+ failuresInjected++;
+ return true;
+ }
+ return false;
+ }
+
+ synchronized int failuresInjected() {
+ return failuresInjected;
+ }
+
+ final class FailureInjectingTimeoutProcessor extends
KafkaAdminClient.TimeoutProcessor {
+ FailureInjectingTimeoutProcessor(long now) {
+ super(now);
+ }
+
+ @Override
+ boolean callHasExpired(KafkaAdminClient.Call call) {
+ if ((!call.isInternal()) && shouldInjectFailure()) {
+ log.debug("Injecting timeout for {}.", call);
+ return true;
+ } else {
+ boolean ret = super.callHasExpired(call);
+ log.debug("callHasExpired({}) = {}", call, ret);
+ return ret;
+ }
+ }
+ }
+ }
+}
diff --git
a/clients/src/testFixtures/java/org/apache/kafka/clients/admin/FailureInjectingTimeoutProcessorFactory.java
b/clients/src/testFixtures/java/org/apache/kafka/clients/admin/FailureInjectingTimeoutProcessorFactory.java
deleted file mode 100644
index 61e9341944c..00000000000
---
a/clients/src/testFixtures/java/org/apache/kafka/clients/admin/FailureInjectingTimeoutProcessorFactory.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.clients.admin;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class FailureInjectingTimeoutProcessorFactory extends
KafkaAdminClient.TimeoutProcessorFactory {
-
- private static final Logger log =
LoggerFactory.getLogger(FailureInjectingTimeoutProcessorFactory.class);
-
- private int numTries = 0;
-
- private int failuresInjected = 0;
-
- @Override
- public KafkaAdminClient.TimeoutProcessor create(long now) {
- return new FailureInjectingTimeoutProcessor(now);
- }
-
- synchronized boolean shouldInjectFailure() {
- numTries++;
- if (numTries == 1) {
- failuresInjected++;
- return true;
- }
- return false;
- }
-
- public synchronized int failuresInjected() {
- return failuresInjected;
- }
-
- public final class FailureInjectingTimeoutProcessor extends
KafkaAdminClient.TimeoutProcessor {
- public FailureInjectingTimeoutProcessor(long now) {
- super(now);
- }
-
- boolean callHasExpired(KafkaAdminClient.Call call) {
- if ((!call.isInternal()) && shouldInjectFailure()) {
- log.debug("Injecting timeout for {}.", call);
- return true;
- } else {
- boolean ret = super.callHasExpired(call);
- log.debug("callHasExpired({}) = {}", call, ret);
- return ret;
- }
- }
- }
-}
diff --git
a/clients/src/testFixtures/java/org/apache/kafka/clients/admin/KafkaAdminClientInternalFactory.java
b/clients/src/testFixtures/java/org/apache/kafka/clients/admin/KafkaAdminClientInternalFactory.java
deleted file mode 100644
index 72d977e2adf..00000000000
---
a/clients/src/testFixtures/java/org/apache/kafka/clients/admin/KafkaAdminClientInternalFactory.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.clients.admin;
-
-public class KafkaAdminClientInternalFactory {
- public static KafkaAdminClient createInternal(AdminClientConfig config,
KafkaAdminClient.TimeoutProcessorFactory timeoutProcessorFactory) {
- return KafkaAdminClient.createInternal(config,
timeoutProcessorFactory);
- }
-}
diff --git
a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
index bcc2f8d4817..2e4b7acf26f 100644
---
a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
+++
b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
@@ -1854,24 +1854,7 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
assertTrue(endTimeMs > startTimeMs, "Expected the timeout to take at least
one millisecond.")
}
- /**
- * Test injecting timeouts for calls that are in flight.
- */
- @Test
- def testCallInFlightTimeouts(): Unit = {
- val config = createConfig
- config.put(AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, "100000000")
- config.put(AdminClientConfig.RETRIES_CONFIG, "0")
- val factory = new FailureInjectingTimeoutProcessorFactory()
- client = KafkaAdminClientInternalFactory.createInternal(new
AdminClientConfig(config), factory)
- val future = client.createTopics(Seq("mytopic", "mytopic2").map(new
NewTopic(_, 1, 1.toShort)).asJava,
- new CreateTopicsOptions().validateOnly(true)).all()
- assertFutureThrows(classOf[TimeoutException], future)
- val future2 = client.createTopics(Seq("mytopic3", "mytopic4").map(new
NewTopic(_, 1, 1.toShort)).asJava,
- new CreateTopicsOptions().validateOnly(true)).all()
- future2.get
- assertEquals(1, factory.failuresInjected)
- }
+
@ParameterizedTest(name =
TestInfoUtils.TestWithParameterizedGroupProtocolNames)
@MethodSource(Array("getTestGroupProtocolParametersAll"))