This is an automated email from the ASF dual-hosted git repository.
clolov pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 832dfa36da7 KAFKA-18637: Fix max connections per ip and override
reconfigurations (#19099)
832dfa36da7 is described below
commit 832dfa36da7b7b7bc3ebe2ed1afec51c76f9a7cc
Author: Azhar Ahmed <[email protected]>
AuthorDate: Mon Mar 10 00:27:48 2025 -0700
KAFKA-18637: Fix max connections per ip and override reconfigurations
(#19099)
Reviewers: Christo Lolov <[email protected]>, TengYao Chi
<[email protected]>, Ken Huang <[email protected]>
---
core/src/main/scala/kafka/server/KafkaConfig.scala | 4 ++--
.../server/DynamicBrokerReconfigurationTest.scala | 24 ++++++++++++++++++++++
2 files changed, 26 insertions(+), 2 deletions(-)
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala
b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 6d120d5b609..f38e62833cb 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -309,8 +309,8 @@ class KafkaConfig private(doLog: Boolean, val props:
util.Map[_, _])
val socketReceiveBufferBytes =
getInt(SocketServerConfigs.SOCKET_RECEIVE_BUFFER_BYTES_CONFIG)
val socketRequestMaxBytes =
getInt(SocketServerConfigs.SOCKET_REQUEST_MAX_BYTES_CONFIG)
val socketListenBacklogSize =
getInt(SocketServerConfigs.SOCKET_LISTEN_BACKLOG_SIZE_CONFIG)
- val maxConnectionsPerIp =
getInt(SocketServerConfigs.MAX_CONNECTIONS_PER_IP_CONFIG)
- val maxConnectionsPerIpOverrides: Map[String, Int] =
+ def maxConnectionsPerIp =
getInt(SocketServerConfigs.MAX_CONNECTIONS_PER_IP_CONFIG)
+ def maxConnectionsPerIpOverrides: Map[String, Int] =
getMap(SocketServerConfigs.MAX_CONNECTIONS_PER_IP_OVERRIDES_CONFIG,
getString(SocketServerConfigs.MAX_CONNECTIONS_PER_IP_OVERRIDES_CONFIG)).map {
case (k, v) => (k, v.toInt)}
def maxConnections = getInt(SocketServerConfigs.MAX_CONNECTIONS_CONFIG)
def maxConnectionCreationRate =
getInt(SocketServerConfigs.MAX_CONNECTION_CREATION_RATE_CONFIG)
diff --git
a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
index f4de50d7cd8..8b9fa754a2a 100644
---
a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
+++
b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
@@ -482,6 +482,30 @@ class DynamicBrokerReconfigurationTest extends
QuorumTestHarness with SaslSetup
waitForAuthenticationFailure(producerBuilder.keyStoreProps(sslProperties1))
}
+ @ParameterizedTest(name =
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+ @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+ def testSocketServerConfigTest(quorum: String, groupProtocol: String): Unit
= {
+ val updatedMaxConnections = "20"
+ val connectionsIpsOverride = "1.2.3.4:1234,1.2.4.5:2345"
+ val properties = new Properties()
+ properties.setProperty(SocketServerConfigs.MAX_CONNECTIONS_PER_IP_CONFIG,
updatedMaxConnections)
+
properties.setProperty(SocketServerConfigs.MAX_CONNECTIONS_PER_IP_OVERRIDES_CONFIG,
connectionsIpsOverride)
+
+ TestUtils.incrementalAlterConfigs(servers, adminClients.head, properties,
true)
+
+ servers.foreach(_.shutdown())
+ servers.foreach(_.awaitShutdown())
+ servers.foreach(_.startup())
+
+ servers.foreach { broker =>
+ assertEquals(updatedMaxConnections, broker.config.originals()
+ .get(SocketServerConfigs.MAX_CONNECTIONS_PER_IP_CONFIG).toString)
+
+ assertEquals(connectionsIpsOverride, broker.config.originals()
+
.get(SocketServerConfigs.MAX_CONNECTIONS_PER_IP_OVERRIDES_CONFIG).toString)
+ }
+ }
+
@ParameterizedTest(name =
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
def testLogCleanerConfig(quorum: String, groupProtocol: String): Unit = {