This is an automated email from the ASF dual-hosted git repository.

clolov pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 832dfa36da7 KAFKA-18637: Fix max connections per ip and override 
reconfigurations (#19099)
832dfa36da7 is described below

commit 832dfa36da7b7b7bc3ebe2ed1afec51c76f9a7cc
Author: Azhar Ahmed <[email protected]>
AuthorDate: Mon Mar 10 00:27:48 2025 -0700

    KAFKA-18637: Fix max connections per ip and override reconfigurations 
(#19099)
    
    Reviewers: Christo Lolov <[email protected]>, TengYao Chi 
<[email protected]>, Ken Huang <[email protected]>
---
 core/src/main/scala/kafka/server/KafkaConfig.scala |  4 ++--
 .../server/DynamicBrokerReconfigurationTest.scala  | 24 ++++++++++++++++++++++
 2 files changed, 26 insertions(+), 2 deletions(-)

diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala 
b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 6d120d5b609..f38e62833cb 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -309,8 +309,8 @@ class KafkaConfig private(doLog: Boolean, val props: 
util.Map[_, _])
   val socketReceiveBufferBytes = 
getInt(SocketServerConfigs.SOCKET_RECEIVE_BUFFER_BYTES_CONFIG)
   val socketRequestMaxBytes = 
getInt(SocketServerConfigs.SOCKET_REQUEST_MAX_BYTES_CONFIG)
   val socketListenBacklogSize = 
getInt(SocketServerConfigs.SOCKET_LISTEN_BACKLOG_SIZE_CONFIG)
-  val maxConnectionsPerIp = 
getInt(SocketServerConfigs.MAX_CONNECTIONS_PER_IP_CONFIG)
-  val maxConnectionsPerIpOverrides: Map[String, Int] =
+  def maxConnectionsPerIp = 
getInt(SocketServerConfigs.MAX_CONNECTIONS_PER_IP_CONFIG)
+  def maxConnectionsPerIpOverrides: Map[String, Int] =
     getMap(SocketServerConfigs.MAX_CONNECTIONS_PER_IP_OVERRIDES_CONFIG, 
getString(SocketServerConfigs.MAX_CONNECTIONS_PER_IP_OVERRIDES_CONFIG)).map { 
case (k, v) => (k, v.toInt)}
   def maxConnections = getInt(SocketServerConfigs.MAX_CONNECTIONS_CONFIG)
   def maxConnectionCreationRate = 
getInt(SocketServerConfigs.MAX_CONNECTION_CREATION_RATE_CONFIG)
diff --git 
a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
 
b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
index f4de50d7cd8..8b9fa754a2a 100644
--- 
a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
+++ 
b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
@@ -482,6 +482,30 @@ class DynamicBrokerReconfigurationTest extends 
QuorumTestHarness with SaslSetup
     waitForAuthenticationFailure(producerBuilder.keyStoreProps(sslProperties1))
   }
 
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+  def testSocketServerConfigTest(quorum: String, groupProtocol: String): Unit 
= {
+    val updatedMaxConnections = "20"
+    val connectionsIpsOverride = "1.2.3.4:1234,1.2.4.5:2345"
+    val properties = new Properties()
+    properties.setProperty(SocketServerConfigs.MAX_CONNECTIONS_PER_IP_CONFIG, 
updatedMaxConnections)
+    
properties.setProperty(SocketServerConfigs.MAX_CONNECTIONS_PER_IP_OVERRIDES_CONFIG,
 connectionsIpsOverride)
+
+    TestUtils.incrementalAlterConfigs(servers, adminClients.head, properties, 
true)
+
+    servers.foreach(_.shutdown())
+    servers.foreach(_.awaitShutdown())
+    servers.foreach(_.startup())
+
+    servers.foreach { broker =>
+      assertEquals(updatedMaxConnections, broker.config.originals()
+        .get(SocketServerConfigs.MAX_CONNECTIONS_PER_IP_CONFIG).toString)
+
+      assertEquals(connectionsIpsOverride, broker.config.originals()
+        
.get(SocketServerConfigs.MAX_CONNECTIONS_PER_IP_OVERRIDES_CONFIG).toString)
+    }
+  }
+
   @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
   @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
   def testLogCleanerConfig(quorum: String, groupProtocol: String): Unit = {

Reply via email to