This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new c9a5f8603b6 MINOR: Migrate controllerSocketTimeoutMs from KafkaConfig 
to AbstractKafkaConfig (#22188)
c9a5f8603b6 is described below

commit c9a5f8603b66812e3ff2590413d9597949bb61cc
Author: Yunchi Pang <[email protected]>
AuthorDate: Fri May 8 13:29:34 2026 -0400

    MINOR: Migrate controllerSocketTimeoutMs from KafkaConfig to 
AbstractKafkaConfig (#22188)
    
    - Migratre `controllerSocketTimeoutMs` to `AbstractKafkaConfig`
    - Update usage in `NodeToControllerChannelManagerImpl`
    
    Reviewers: Murali Basani <[email protected]>, Ken Huang
     <[email protected]>, Chia-Ping Tsai   <[email protected]>
---
 core/src/main/scala/kafka/server/KafkaConfig.scala |  1 -
 .../server/NodeToControllerChannelManagerImpl.java |  5 +-
 .../server/NodeToControllerRequestThread.java      |  6 +-
 .../kafka/server/config/AbstractKafkaConfig.java   |  4 ++
 .../server/NodeToControllerRequestThreadTest.java  | 83 ++++++++++------------
 5 files changed, 44 insertions(+), 55 deletions(-)

diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala 
b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 6f8781b72e0..c8196a17d3a 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -236,7 +236,6 @@ class KafkaConfig private(doLog: Boolean, val props: 
util.Map[_, _])
   val replicaSelectorClassName = 
Option(getString(ReplicationConfigs.REPLICA_SELECTOR_CLASS_CONFIG))
 
   /** ********* Replication configuration ***********/
-  val controllerSocketTimeoutMs: Int = 
getInt(ReplicationConfigs.CONTROLLER_SOCKET_TIMEOUT_MS_CONFIG)
   val defaultReplicationFactor: Int = 
getInt(ReplicationConfigs.DEFAULT_REPLICATION_FACTOR_CONFIG)
   val replicaLagTimeMaxMs = 
getLong(ReplicationConfigs.REPLICA_LAG_TIME_MAX_MS_CONFIG)
   val replicaSocketTimeoutMs = 
getInt(ReplicationConfigs.REPLICA_SOCKET_TIMEOUT_MS_CONFIG)
diff --git 
a/server/src/main/java/org/apache/kafka/server/NodeToControllerChannelManagerImpl.java
 
b/server/src/main/java/org/apache/kafka/server/NodeToControllerChannelManagerImpl.java
index 1306da48c5a..42394a9074f 100644
--- 
a/server/src/main/java/org/apache/kafka/server/NodeToControllerChannelManagerImpl.java
+++ 
b/server/src/main/java/org/apache/kafka/server/NodeToControllerChannelManagerImpl.java
@@ -38,7 +38,6 @@ import org.apache.kafka.raft.KRaftConfigs;
 import org.apache.kafka.server.common.ControllerRequestCompletionHandler;
 import org.apache.kafka.server.common.NodeToControllerChannelManager;
 import org.apache.kafka.server.config.AbstractKafkaConfig;
-import org.apache.kafka.server.config.ReplicationConfigs;
 import org.apache.kafka.server.config.ServerConfigs;
 
 import org.slf4j.Logger;
@@ -82,7 +81,7 @@ public class NodeToControllerChannelManagerImpl implements 
NodeToControllerChann
                 buildNetworkClient(controllerInformation),
                 manualMetadataUpdater,
                 controllerNodeProvider,
-                config,
+                config.controllerSocketTimeoutMs(),
                 time,
                 threadName,
                 retryTimeoutMs
@@ -122,7 +121,7 @@ public class NodeToControllerChannelManagerImpl implements 
NodeToControllerChann
                 50,
                 Selectable.USE_DEFAULT_BUFFER_SIZE,
                 Selectable.USE_DEFAULT_BUFFER_SIZE,
-                Math.min(Integer.MAX_VALUE, (int) 
Math.min(config.getInt(ReplicationConfigs.CONTROLLER_SOCKET_TIMEOUT_MS_CONFIG), 
retryTimeoutMs)), // request timeout should not exceed the provided retry 
timeout
+                Math.min(Integer.MAX_VALUE, (int) 
Math.min(config.controllerSocketTimeoutMs(), retryTimeoutMs)), // request 
timeout should not exceed the provided retry timeout
                 
config.getLong(ServerConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MS_CONFIG),
                 
config.getLong(ServerConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_CONFIG),
                 time,
diff --git 
a/server/src/main/java/org/apache/kafka/server/NodeToControllerRequestThread.java
 
b/server/src/main/java/org/apache/kafka/server/NodeToControllerRequestThread.java
index e3d870999b2..6f4a85b3aa3 100644
--- 
a/server/src/main/java/org/apache/kafka/server/NodeToControllerRequestThread.java
+++ 
b/server/src/main/java/org/apache/kafka/server/NodeToControllerRequestThread.java
@@ -21,10 +21,8 @@ import org.apache.kafka.clients.ClientResponse;
 import org.apache.kafka.clients.KafkaClient;
 import org.apache.kafka.clients.ManualMetadataUpdater;
 import org.apache.kafka.common.Node;
-import org.apache.kafka.common.config.AbstractConfig;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.server.config.ReplicationConfigs;
 import org.apache.kafka.server.util.InterBrokerSendThread;
 import org.apache.kafka.server.util.RequestAndCompletionHandler;
 
@@ -67,11 +65,11 @@ public class NodeToControllerRequestThread extends 
InterBrokerSendThread {
     public NodeToControllerRequestThread(KafkaClient initialNetworkClient,
                                          ManualMetadataUpdater metadataUpdater,
                                          Supplier<ControllerInformation> 
controllerNodeProvider,
-                                         AbstractConfig config,
+                                         int controllerSocketTimeoutMs,
                                          Time time,
                                          String threadName,
                                          Long retryTimeoutMs) {
-        super(threadName, initialNetworkClient, Math.min(Integer.MAX_VALUE, 
(int) 
Math.min(config.getInt(ReplicationConfigs.CONTROLLER_SOCKET_TIMEOUT_MS_CONFIG), 
retryTimeoutMs)), time, false);
+        super(threadName, initialNetworkClient, Math.min(Integer.MAX_VALUE, 
(int) Math.min(controllerSocketTimeoutMs, retryTimeoutMs)), time, false);
         this.time = time;
         this.controllerNodeProvider = controllerNodeProvider;
         this.metadataUpdater = metadataUpdater;
diff --git 
a/server/src/main/java/org/apache/kafka/server/config/AbstractKafkaConfig.java 
b/server/src/main/java/org/apache/kafka/server/config/AbstractKafkaConfig.java
index d129fdcd65b..ac5e33dc70b 100644
--- 
a/server/src/main/java/org/apache/kafka/server/config/AbstractKafkaConfig.java
+++ 
b/server/src/main/java/org/apache/kafka/server/config/AbstractKafkaConfig.java
@@ -113,6 +113,10 @@ public abstract class AbstractKafkaConfig extends 
AbstractConfig {
         return getInt(ReplicationConfigs.NUM_REPLICA_FETCHERS_CONFIG);
     }
 
+    public int controllerSocketTimeoutMs() {
+        return getInt(ReplicationConfigs.CONTROLLER_SOCKET_TIMEOUT_MS_CONFIG);
+    }
+
     public int numRecoveryThreadsPerDataDir() {
         return 
getInt(ServerLogConfigs.NUM_RECOVERY_THREADS_PER_DATA_DIR_CONFIG);
     }
diff --git 
a/server/src/test/java/org/apache/kafka/server/NodeToControllerRequestThreadTest.java
 
b/server/src/test/java/org/apache/kafka/server/NodeToControllerRequestThreadTest.java
index 5341db3c12d..2814c343dbc 100644
--- 
a/server/src/test/java/org/apache/kafka/server/NodeToControllerRequestThreadTest.java
+++ 
b/server/src/test/java/org/apache/kafka/server/NodeToControllerRequestThreadTest.java
@@ -22,7 +22,6 @@ import org.apache.kafka.clients.Metadata;
 import org.apache.kafka.clients.MockClient;
 import org.apache.kafka.clients.NodeApiVersions;
 import org.apache.kafka.common.Node;
-import org.apache.kafka.common.config.AbstractConfig;
 import org.apache.kafka.common.message.EnvelopeResponseData;
 import org.apache.kafka.common.message.MetadataRequestData;
 import org.apache.kafka.common.network.ListenerName;
@@ -60,10 +59,6 @@ import static org.mockito.Mockito.mock;
 
 class NodeToControllerRequestThreadTest {
 
-    private static AbstractConfig createConfig() {
-        return new AbstractConfig(ReplicationConfigs.CONFIG_DEF, Map.of());
-    }
-
     private static ControllerInformation controllerInfo(Optional<Node> node) {
         return new ControllerInformation(node, new ListenerName(""), 
SecurityProtocol.PLAINTEXT, "");
     }
@@ -86,20 +81,36 @@ class NodeToControllerRequestThreadTest {
         return () -> ref.getAndSet(second);
     }
 
+    private static NodeToControllerRequestThread createAndStartRequestThread(
+            MockClient mockClient,
+            Supplier<ControllerInformation> controllerNodeProvider,
+            MockTime time,
+            long retryTimeoutMs) {
+        NodeToControllerRequestThread thread = new 
NodeToControllerRequestThread(
+            mockClient, new ManualMetadataUpdater(),
+            controllerNodeProvider, 
ReplicationConfigs.CONTROLLER_SOCKET_TIMEOUT_MS_DEFAULT, time, "", 
retryTimeoutMs);
+        thread.setStarted(true);
+        return thread;
+    }
+
+    private static NodeToControllerRequestThread createAndStartRequestThread(
+            MockClient mockClient,
+            Supplier<ControllerInformation> controllerNodeProvider,
+            MockTime time) {
+        return createAndStartRequestThread(mockClient, controllerNodeProvider, 
time, Long.MAX_VALUE);
+    }
+
     @Test
     void testRetryTimeoutWhileControllerNotAvailable() {
         MockTime time = new MockTime();
-        AbstractConfig config = createConfig();
         Metadata metadata = mock(Metadata.class);
         MockClient mockClient = new MockClient(time, metadata);
 
         Supplier<ControllerInformation> controllerNodeProvider = 
NodeToControllerRequestThreadTest::emptyControllerInfo;
 
         long retryTimeoutMs = 30000;
-        NodeToControllerRequestThread testRequestThread = new 
NodeToControllerRequestThread(
-            mockClient, new ManualMetadataUpdater(),
-            controllerNodeProvider, config, time, "", retryTimeoutMs);
-        testRequestThread.setStarted(true);
+        NodeToControllerRequestThread testRequestThread = 
createAndStartRequestThread(
+            mockClient, controllerNodeProvider, time, retryTimeoutMs);
 
         TestControllerRequestCompletionHandler completionHandler =
             new TestControllerRequestCompletionHandler(null);
@@ -123,7 +134,6 @@ class NodeToControllerRequestThreadTest {
     void testRequestsSent() {
         // just a simple test that tests whether the request from 1 -> 2 is 
sent and the response callback is called
         MockTime time = new MockTime();
-        AbstractConfig config = createConfig();
         int controllerId = 2;
 
         Metadata metadata = mock(Metadata.class);
@@ -134,10 +144,8 @@ class NodeToControllerRequestThreadTest {
             () -> controllerInfo(Optional.of(activeController));
 
         MetadataResponse expectedResponse = 
RequestTestUtils.metadataUpdateWith(2, Map.of("a", 2));
-        NodeToControllerRequestThread testRequestThread = new 
NodeToControllerRequestThread(
-            mockClient, new ManualMetadataUpdater(),
-            controllerNodeProvider, config, time, "", Long.MAX_VALUE);
-        testRequestThread.setStarted(true);
+        NodeToControllerRequestThread testRequestThread = 
createAndStartRequestThread(
+            mockClient, controllerNodeProvider, time);
         mockClient.prepareResponse(expectedResponse);
 
         TestControllerRequestCompletionHandler completionHandler =
@@ -164,7 +172,6 @@ class NodeToControllerRequestThreadTest {
     void testControllerChanged() {
         // in this test the controller changes from node 1 -> node 2
         MockTime time = new MockTime();
-        AbstractConfig config = createConfig();
         int oldControllerId = 1;
         int newControllerId = 2;
 
@@ -178,10 +185,8 @@ class NodeToControllerRequestThreadTest {
             controllerInfo(Optional.of(newController)));
 
         MetadataResponse expectedResponse = 
RequestTestUtils.metadataUpdateWith(3, Map.of("a", 2));
-        NodeToControllerRequestThread testRequestThread = new 
NodeToControllerRequestThread(
-            mockClient, new ManualMetadataUpdater(),
-            controllerNodeProvider, config, time, "", Long.MAX_VALUE);
-        testRequestThread.setStarted(true);
+        NodeToControllerRequestThread testRequestThread = 
createAndStartRequestThread(
+            mockClient, controllerNodeProvider, time);
 
         TestControllerRequestCompletionHandler completionHandler =
             new TestControllerRequestCompletionHandler(expectedResponse);
@@ -212,7 +217,6 @@ class NodeToControllerRequestThreadTest {
     @Test
     void testNotController() {
         MockTime time = new MockTime();
-        AbstractConfig config = createConfig();
         int oldControllerId = 1;
         int newControllerId = 2;
 
@@ -230,10 +234,8 @@ class NodeToControllerRequestThreadTest {
             Map.of("a", Errors.NOT_CONTROLLER),
             Map.of("a", 2));
         MetadataResponse expectedResponse = 
RequestTestUtils.metadataUpdateWith(3, Map.of("a", 2));
-        NodeToControllerRequestThread testRequestThread = new 
NodeToControllerRequestThread(
-            mockClient, new ManualMetadataUpdater(),
-            controllerNodeProvider, config, time, "", Long.MAX_VALUE);
-        testRequestThread.setStarted(true);
+        NodeToControllerRequestThread testRequestThread = 
createAndStartRequestThread(
+            mockClient, controllerNodeProvider, time);
 
         TestControllerRequestCompletionHandler completionHandler =
             new TestControllerRequestCompletionHandler(expectedResponse);
@@ -271,7 +273,6 @@ class NodeToControllerRequestThreadTest {
     @Test
     void testEnvelopeResponseWithNotControllerError() {
         MockTime time = new MockTime();
-        AbstractConfig config = createConfig();
         int oldControllerId = 1;
         int newControllerId = 2;
 
@@ -294,10 +295,8 @@ class NodeToControllerRequestThreadTest {
         // response for retry request after receiving NOT_CONTROLLER error
         MetadataResponse expectedResponse = 
RequestTestUtils.metadataUpdateWith(3, Map.of("a", 2));
 
-        NodeToControllerRequestThread testRequestThread = new 
NodeToControllerRequestThread(
-            mockClient, new ManualMetadataUpdater(),
-            controllerNodeProvider, config, time, "", Long.MAX_VALUE);
-        testRequestThread.setStarted(true);
+        NodeToControllerRequestThread testRequestThread = 
createAndStartRequestThread(
+            mockClient, controllerNodeProvider, time);
 
         TestControllerRequestCompletionHandler completionHandler =
             new TestControllerRequestCompletionHandler(expectedResponse);
@@ -343,7 +342,6 @@ class NodeToControllerRequestThreadTest {
     @Test
     void testRetryTimeout() {
         MockTime time = new MockTime();
-        AbstractConfig config = createConfig();
         int controllerId = 1;
 
         Metadata metadata = mock(Metadata.class);
@@ -357,10 +355,8 @@ class NodeToControllerRequestThreadTest {
         MetadataResponse responseWithNotControllerError = 
RequestTestUtils.metadataUpdateWith("cluster1", 2,
             Map.of("a", Errors.NOT_CONTROLLER),
             Map.of("a", 2));
-        NodeToControllerRequestThread testRequestThread = new 
NodeToControllerRequestThread(
-            mockClient, new ManualMetadataUpdater(),
-            controllerNodeProvider, config, time, "", retryTimeoutMs);
-        testRequestThread.setStarted(true);
+        NodeToControllerRequestThread testRequestThread = 
createAndStartRequestThread(
+            mockClient, controllerNodeProvider, time, retryTimeoutMs);
 
         TestControllerRequestCompletionHandler completionHandler =
             new TestControllerRequestCompletionHandler();
@@ -391,7 +387,6 @@ class NodeToControllerRequestThreadTest {
     @Test
     void testUnsupportedVersionHandling() {
         MockTime time = new MockTime();
-        AbstractConfig config = createConfig();
         int controllerId = 2;
 
         Metadata metadata = mock(Metadata.class);
@@ -422,10 +417,8 @@ class NodeToControllerRequestThreadTest {
 
         mockClient.prepareUnsupportedVersionResponse(request -> 
request.apiKey() == ApiKeys.METADATA);
 
-        NodeToControllerRequestThread testRequestThread = new 
NodeToControllerRequestThread(
-            mockClient, new ManualMetadataUpdater(),
-            controllerNodeProvider, config, time, "", Long.MAX_VALUE);
-        testRequestThread.setStarted(true);
+        NodeToControllerRequestThread testRequestThread = 
createAndStartRequestThread(
+            mockClient, controllerNodeProvider, time);
 
         testRequestThread.enqueue(queueItem);
         pollUntil(testRequestThread, () -> callbackResponse.get() != null);
@@ -435,7 +428,6 @@ class NodeToControllerRequestThreadTest {
     @Test
     void testAuthenticationExceptionHandling() {
         MockTime time = new MockTime();
-        AbstractConfig config = createConfig();
         int controllerId = 2;
 
         Metadata metadata = mock(Metadata.class);
@@ -466,10 +458,8 @@ class NodeToControllerRequestThreadTest {
 
         mockClient.createPendingAuthenticationError(activeController, 50);
 
-        NodeToControllerRequestThread testRequestThread = new 
NodeToControllerRequestThread(
-            mockClient, new ManualMetadataUpdater(),
-            controllerNodeProvider, config, time, "", Long.MAX_VALUE);
-        testRequestThread.setStarted(true);
+        NodeToControllerRequestThread testRequestThread = 
createAndStartRequestThread(
+            mockClient, controllerNodeProvider, time);
 
         testRequestThread.enqueue(queueItem);
         pollUntil(testRequestThread, () -> callbackResponse.get() != null);
@@ -481,7 +471,6 @@ class NodeToControllerRequestThreadTest {
     void testThreadNotStarted() {
         // Make sure we throw if we enqueue anything while the thread is not 
running
         MockTime time = new MockTime();
-        AbstractConfig config = createConfig();
 
         Metadata metadata = mock(Metadata.class);
         MockClient mockClient = new MockClient(time, metadata);
@@ -490,7 +479,7 @@ class NodeToControllerRequestThreadTest {
 
         NodeToControllerRequestThread testRequestThread = new 
NodeToControllerRequestThread(
             mockClient, new ManualMetadataUpdater(),
-            controllerNodeProvider, config, time, "", Long.MAX_VALUE);
+            controllerNodeProvider, 
ReplicationConfigs.CONTROLLER_SOCKET_TIMEOUT_MS_DEFAULT, time, "", 
Long.MAX_VALUE);
 
         TestControllerRequestCompletionHandler completionHandler =
             new TestControllerRequestCompletionHandler(null);

Reply via email to