This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 92004fa KAFKA-6751; Support dynamic configuration of
max.connections.per.ip/max.connections.per.ip.overrides configs (KIP-308)
(#5334)
92004fa is described below
commit 92004fa21a9cc75e3790ab39e44d4d3b754d95d9
Author: Manikumar Reddy O <[email protected]>
AuthorDate: Fri Aug 10 03:10:24 2018 +0530
KAFKA-6751; Support dynamic configuration of
max.connections.per.ip/max.connections.per.ip.overrides configs (KIP-308)
(#5334)
KIP-308 implementation. See
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=85474993.
Reviewers: Rajini Sivaram <[email protected]>, Jason Gustafson
<[email protected]>
---
.../java/org/apache/kafka/common/utils/Utils.java | 11 ++
.../org/apache/kafka/common/utils/UtilsTest.java | 11 ++
.../main/scala/kafka/network/SocketServer.scala | 28 +++-
.../scala/kafka/server/DynamicBrokerConfig.scala | 25 +++-
core/src/main/scala/kafka/server/KafkaConfig.scala | 5 +
.../kafka/network/DynamicConnectionQuotaTest.scala | 143 +++++++++++++++++++++
.../kafka/server/DynamicBrokerConfigTest.scala | 16 +++
.../scala/unit/kafka/server/KafkaConfigTest.scala | 2 +
docs/configuration.html | 8 ++
9 files changed, 242 insertions(+), 7 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
index 6e0b693..a6d3e2c 100755
--- a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
+++ b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
@@ -68,6 +68,8 @@ public final class Utils {
// IPv6 is supported with [ip] pattern
private static final Pattern HOST_PORT_PATTERN =
Pattern.compile(".*?\\[?([0-9a-zA-Z\\-%._:]*)\\]?:([0-9]+)");
+ private static final Pattern VALID_HOST_CHARACTERS =
Pattern.compile("([0-9a-zA-Z\\-%._:]*)");
+
// Prints up to 2 decimal digits. Used for human readable printing
private static final DecimalFormat TWO_DIGIT_FORMAT = new
DecimalFormat("0.##");
@@ -436,6 +438,15 @@ public final class Utils {
}
/**
+ * Basic validation of the supplied address. checks for valid characters
+ * @param address hostname string to validate
+ * @return true if address contains valid characters
+ */
+ public static boolean validHostPattern(String address) {
+ return VALID_HOST_CHARACTERS.matcher(address).matches();
+ }
+
+ /**
* Formats hostname and port number as a "host:port" address string,
* surrounding IPv6 addresses with braces '[', ']'
* @param host hostname
diff --git a/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java
b/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java
index 3feeff2..4d1d830 100755
--- a/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java
@@ -39,6 +39,7 @@ import static
org.apache.kafka.common.utils.Utils.formatAddress;
import static org.apache.kafka.common.utils.Utils.formatBytes;
import static org.apache.kafka.common.utils.Utils.getHost;
import static org.apache.kafka.common.utils.Utils.getPort;
+import static org.apache.kafka.common.utils.Utils.validHostPattern;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@@ -60,6 +61,16 @@ public class UtilsTest {
}
@Test
+ public void testHostPattern() {
+ assertTrue(validHostPattern("127.0.0.1"));
+ assertTrue(validHostPattern("mydomain.com"));
+ assertTrue(validHostPattern("MyDomain.com"));
+ assertTrue(validHostPattern("My_Domain.com"));
+ assertTrue(validHostPattern("::1"));
+ assertTrue(validHostPattern("2001:db8:85a3:8d3:1319:8a2e:370"));
+ }
+
+ @Test
public void testGetPort() {
assertEquals(8000, getPort("127.0.0.1:8000").intValue());
assertEquals(8080, getPort("mydomain.com:8080").intValue());
diff --git a/core/src/main/scala/kafka/network/SocketServer.scala
b/core/src/main/scala/kafka/network/SocketServer.scala
index 62fc7a5..749c921 100644
--- a/core/src/main/scala/kafka/network/SocketServer.scala
+++ b/core/src/main/scala/kafka/network/SocketServer.scala
@@ -57,9 +57,6 @@ class SocketServer(val config: KafkaConfig, val metrics:
Metrics, val time: Time
private val maxQueuedRequests = config.queuedMaxRequests
- private val maxConnectionsPerIp = config.maxConnectionsPerIp
- private val maxConnectionsPerIpOverrides =
config.maxConnectionsPerIpOverrides
-
private val logContext = new LogContext(s"[SocketServer
brokerId=${config.brokerId}] ")
this.logIdent = logContext.logPrefix
@@ -90,7 +87,7 @@ class SocketServer(val config: KafkaConfig, val metrics:
Metrics, val time: Time
*/
def startup(startupProcessors: Boolean = true) {
this.synchronized {
- connectionQuotas = new ConnectionQuotas(maxConnectionsPerIp,
maxConnectionsPerIpOverrides)
+ connectionQuotas = new ConnectionQuotas(config.maxConnectionsPerIp,
config.maxConnectionsPerIpOverrides)
createAcceptorAndProcessors(config.numNetworkThreads, config.listeners)
if (startupProcessors) {
startProcessors()
@@ -229,6 +226,16 @@ class SocketServer(val config: KafkaConfig, val metrics:
Metrics, val time: Time
}
}
+ def updateMaxConnectionsPerIp(maxConnectionsPerIp: Int): Unit = {
+ info(s"Updating maxConnectionsPerIp: $maxConnectionsPerIp")
+ connectionQuotas.updateMaxConnectionsPerIp(maxConnectionsPerIp)
+ }
+
+ def updateMaxConnectionsPerIpOverride(maxConnectionsPerIpOverrides:
Map[String, Int]): Unit = {
+ info(s"Updating maxConnectionsPerIpOverrides:
${maxConnectionsPerIpOverrides.map { case (k, v) => s"$k=$v" }.mkString(",")}")
+
connectionQuotas.updateMaxConnectionsPerIpOverride(maxConnectionsPerIpOverrides)
+ }
+
/* `protected` for test usage */
protected[network] def newProcessor(id: Int, connectionQuotas:
ConnectionQuotas, listenerName: ListenerName,
securityProtocol: SecurityProtocol,
memoryPool: MemoryPool): Processor = {
@@ -878,19 +885,28 @@ private[kafka] class Processor(val id: Int,
class ConnectionQuotas(val defaultMax: Int, overrideQuotas: Map[String, Int]) {
- private val overrides = overrideQuotas.map { case (host, count) =>
(InetAddress.getByName(host), count) }
+ @volatile private var defaultMaxConnectionsPerIp = defaultMax
+ @volatile private var maxConnectionsPerIpOverrides = overrideQuotas.map {
case (host, count) => (InetAddress.getByName(host), count) }
private val counts = mutable.Map[InetAddress, Int]()
def inc(address: InetAddress) {
counts.synchronized {
val count = counts.getOrElseUpdate(address, 0)
counts.put(address, count + 1)
- val max = overrides.getOrElse(address, defaultMax)
+ val max = maxConnectionsPerIpOverrides.getOrElse(address,
defaultMaxConnectionsPerIp)
if (count >= max)
throw new TooManyConnectionsException(address, max)
}
}
+ def updateMaxConnectionsPerIp(maxConnectionsPerIp: Int): Unit = {
+ defaultMaxConnectionsPerIp = maxConnectionsPerIp
+ }
+
+ def updateMaxConnectionsPerIpOverride(overrideQuotas: Map[String, Int]):
Unit = {
+ maxConnectionsPerIpOverrides = overrideQuotas.map { case (host, count) =>
(InetAddress.getByName(host), count) }
+ }
+
def dec(address: InetAddress) {
counts.synchronized {
val count = counts.getOrElse(address,
diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
index 72772fa..19743e5 100755
--- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
+++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
@@ -80,7 +80,8 @@ object DynamicBrokerConfig {
DynamicLogConfig.ReconfigurableConfigs ++
DynamicThreadPool.ReconfigurableConfigs ++
Set(KafkaConfig.MetricReporterClassesProp) ++
- DynamicListenerConfig.ReconfigurableConfigs
+ DynamicListenerConfig.ReconfigurableConfigs ++
+ DynamicConnectionQuota.ReconfigurableConfigs
private val PerBrokerConfigs = DynamicSecurityConfigs ++
DynamicListenerConfig.ReconfigurableConfigs
@@ -197,6 +198,7 @@ class DynamicBrokerConfig(private val kafkaConfig:
KafkaConfig) extends Logging
addReconfigurable(new DynamicMetricsReporters(kafkaConfig.brokerId,
kafkaServer))
addReconfigurable(new DynamicClientQuotaCallback(kafkaConfig.brokerId,
kafkaServer))
addBrokerReconfigurable(new DynamicListenerConfig(kafkaServer))
+ addBrokerReconfigurable(new DynamicConnectionQuota(kafkaServer))
}
def addReconfigurable(reconfigurable: Reconfigurable): Unit =
CoreUtils.inWriteLock(lock) {
@@ -815,3 +817,24 @@ class DynamicListenerConfig(server: KafkaServer) extends
BrokerReconfigurable wi
}
+object DynamicConnectionQuota {
+ val ReconfigurableConfigs = Set(KafkaConfig.MaxConnectionsPerIpProp,
KafkaConfig.MaxConnectionsPerIpOverridesProp)
+}
+
+class DynamicConnectionQuota(server: KafkaServer) extends BrokerReconfigurable
{
+
+ override def reconfigurableConfigs: Set[String] = {
+ DynamicConnectionQuota.ReconfigurableConfigs
+ }
+
+ override def validateReconfiguration(newConfig: KafkaConfig): Unit = {
+ }
+
+ override def reconfigure(oldConfig: KafkaConfig, newConfig: KafkaConfig):
Unit = {
+
server.socketServer.updateMaxConnectionsPerIpOverride(newConfig.maxConnectionsPerIpOverrides)
+
+ if (newConfig.maxConnectionsPerIp != oldConfig.maxConnectionsPerIp)
+
server.socketServer.updateMaxConnectionsPerIp(newConfig.maxConnectionsPerIp)
+ }
+}
+
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala
b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 3d7367e..b651549 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -36,6 +36,7 @@ import org.apache.kafka.common.metrics.Sensor
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.record.{LegacyRecord, Records, TimestampType}
import org.apache.kafka.common.security.auth.SecurityProtocol
+import org.apache.kafka.common.utils.Utils
import scala.collection.JavaConverters._
import scala.collection.Map
@@ -1392,5 +1393,9 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog:
Boolean, dynamicConfigO
if (maxConnectionsPerIp == 0)
require(!maxConnectionsPerIpOverrides.isEmpty,
s"${KafkaConfig.MaxConnectionsPerIpProp} can be set to zero only if" +
s" ${KafkaConfig.MaxConnectionsPerIpOverridesProp} property is set.")
+
+ val invalidAddresses = maxConnectionsPerIpOverrides.keys.filterNot(address
=> Utils.validHostPattern(address))
+ if (!invalidAddresses.isEmpty)
+ throw new
IllegalArgumentException(s"${KafkaConfig.MaxConnectionsPerIpOverridesProp}
contains invalid addresses : ${invalidAddresses.mkString(",")}")
}
}
diff --git
a/core/src/test/scala/integration/kafka/network/DynamicConnectionQuotaTest.scala
b/core/src/test/scala/integration/kafka/network/DynamicConnectionQuotaTest.scala
new file mode 100644
index 0000000..374556b
--- /dev/null
+++
b/core/src/test/scala/integration/kafka/network/DynamicConnectionQuotaTest.scala
@@ -0,0 +1,143 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.network
+
+import java.io.IOException
+import java.net.{InetAddress, Socket}
+import java.util.Properties
+
+import kafka.server.{BaseRequestTest, KafkaConfig}
+import kafka.utils.TestUtils
+import org.apache.kafka.clients.admin.{AdminClient, AdminClientConfig}
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.network.ListenerName
+import org.apache.kafka.common.protocol.{ApiKeys, Errors}
+import org.apache.kafka.common.record.{CompressionType, MemoryRecords,
SimpleRecord}
+import org.apache.kafka.common.requests.{ProduceRequest, ProduceResponse}
+import org.apache.kafka.common.security.auth.SecurityProtocol
+import org.junit.Assert.assertEquals
+import org.junit.{Before, Test}
+
+import scala.collection.JavaConverters._
+
+class DynamicConnectionQuotaTest extends BaseRequestTest {
+
+ override def numBrokers = 1
+
+ val topic = "test"
+
+ @Before
+ override def setUp(): Unit = {
+ super.setUp()
+ TestUtils.createTopic(zkClient, topic, numBrokers, numBrokers, servers)
+ }
+
+ @Test
+ def testDynamicConnectionQuota(): Unit = {
+ def connect(socketServer: SocketServer, protocol: SecurityProtocol =
SecurityProtocol.PLAINTEXT, localAddr: InetAddress = null) = {
+ new Socket("localhost",
socketServer.boundPort(ListenerName.forSecurityProtocol(protocol)), localAddr,
0)
+ }
+
+ val socketServer = servers.head.socketServer
+ val localAddress = InetAddress.getByName("127.0.0.1")
+ def connectionCount = socketServer.connectionCount(localAddress)
+ val initialConnectionCount = connectionCount
+ val maxConnectionsPerIP = 5
+
+ val props = new Properties
+ props.put(KafkaConfig.MaxConnectionsPerIpProp,
maxConnectionsPerIP.toString)
+ reconfigureServers(props, perBrokerConfig = false,
(KafkaConfig.MaxConnectionsPerIpProp, maxConnectionsPerIP.toString))
+
+ //wait for adminClient connections to close
+ TestUtils.waitUntilTrue(() => initialConnectionCount == connectionCount,
"Connection count mismatch")
+
+ //create connections up to maxConnectionsPerIP - 1, leave space for one
connection
+ var conns = (connectionCount until (maxConnectionsPerIP - 1)).map(_ =>
connect(socketServer))
+
+ // produce should succeed
+ var produceResponse = sendProduceRequest()
+ assertEquals(1, produceResponse.responses.size)
+ val (tp, partitionResponse) = produceResponse.responses.asScala.head
+ assertEquals(Errors.NONE, partitionResponse.error)
+
+ conns = conns :+ connect(socketServer)
+ // now try one more (should fail)
+ intercept[IOException](sendProduceRequest())
+
+ conns.foreach(conn => conn.close())
+ TestUtils.waitUntilTrue(() => initialConnectionCount == connectionCount,
"Connection count mismatch")
+
+ // Increase MaxConnectionsPerIpOverrides for localhost to 7
+ val maxConnectionsPerIPOverride = 7
+ props.put(KafkaConfig.MaxConnectionsPerIpOverridesProp,
s"localhost:$maxConnectionsPerIPOverride")
+ reconfigureServers(props, perBrokerConfig = false,
(KafkaConfig.MaxConnectionsPerIpOverridesProp,
s"localhost:$maxConnectionsPerIPOverride"))
+
+ //wait for adminClient connections to close
+ TestUtils.waitUntilTrue(() => initialConnectionCount == connectionCount,
"Connection count mismatch")
+
+ //create connections up to maxConnectionsPerIPOverride - 1, leave space
for one connection
+ conns = (connectionCount until maxConnectionsPerIPOverride - 1).map(_ =>
connect(socketServer))
+
+ // send should succeed
+ produceResponse = sendProduceRequest()
+ assertEquals(1, produceResponse.responses.size)
+ val (tp1, partitionResponse1) = produceResponse.responses.asScala.head
+ assertEquals(Errors.NONE, partitionResponse1.error)
+
+ conns = conns :+ connect(socketServer)
+ // now try one more (should fail)
+ intercept[IOException](sendProduceRequest())
+
+ //close one connection
+ conns.head.close()
+ // send should succeed
+ sendProduceRequest()
+ }
+
+ private def reconfigureServers(newProps: Properties, perBrokerConfig:
Boolean, aPropToVerify: (String, String)): Unit = {
+ val adminClient = createAdminClient()
+ TestUtils.alterConfigs(servers, adminClient, newProps,
perBrokerConfig).all.get()
+ waitForConfigOnServer(aPropToVerify._1, aPropToVerify._2)
+ adminClient.close()
+ }
+
+ private def createAdminClient(): AdminClient = {
+ val bootstrapServers = TestUtils.bootstrapServers(servers, new
ListenerName(securityProtocol.name))
+ val config = new Properties()
+ config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers)
+ config.put(AdminClientConfig.METADATA_MAX_AGE_CONFIG, "10")
+ val adminClient = AdminClient.create(config)
+ adminClient
+ }
+
+ private def waitForConfigOnServer(propName: String, propValue: String,
maxWaitMs: Long = 10000): Unit = {
+ TestUtils.retry(maxWaitMs) {
+ assertEquals(propValue, servers.head.config.originals.get(propName))
+ }
+ }
+
+ private def sendProduceRequest(): ProduceResponse = {
+ val topicPartition = new TopicPartition(topic, 0)
+ val memoryRecords = MemoryRecords.withRecords(CompressionType.NONE, new
SimpleRecord(System.currentTimeMillis(), "key".getBytes, "value".getBytes))
+ val partitionRecords = Map(topicPartition -> memoryRecords)
+ val request = ProduceRequest.Builder.forCurrentMagic(-1, 3000,
partitionRecords.asJava).build()
+ val response = connectAndSend(request, ApiKeys.PRODUCE,
servers.head.socketServer)
+ ProduceResponse.parse(response, request.version)
+ }
+}
diff --git
a/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
index 9c8acb4..41b9055 100755
--- a/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
@@ -171,6 +171,22 @@ class DynamicBrokerConfigTest extends JUnitSuite {
verifyUpdate(SslConfigs.SSL_KEY_PASSWORD_CONFIG, "password")
}
+ @Test
+ def testConnectionQuota(): Unit = {
+ verifyConfigUpdate(KafkaConfig.MaxConnectionsPerIpProp, "100",
perBrokerConfig = true, expectFailure = false)
+ verifyConfigUpdate(KafkaConfig.MaxConnectionsPerIpProp, "100",
perBrokerConfig = false, expectFailure = false)
+ //MaxConnectionsPerIpProp can be set to zero only if
MaxConnectionsPerIpOverridesProp property is set
+ verifyConfigUpdate(KafkaConfig.MaxConnectionsPerIpProp, "0",
perBrokerConfig = false, expectFailure = true)
+
+ verifyConfigUpdate(KafkaConfig.MaxConnectionsPerIpOverridesProp,
"hostName1:100,hostName2:0", perBrokerConfig = true,
+ expectFailure = false)
+ verifyConfigUpdate(KafkaConfig.MaxConnectionsPerIpOverridesProp,
"hostName1:100,hostName2:0", perBrokerConfig = false,
+ expectFailure = false)
+ //test invalid address
+ verifyConfigUpdate(KafkaConfig.MaxConnectionsPerIpOverridesProp,
"hostName#:100", perBrokerConfig = true,
+ expectFailure = true)
+ }
+
private def verifyConfigUpdate(name: String, value: Object, perBrokerConfig:
Boolean, expectFailure: Boolean) {
val configProps = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect,
port = 8181)
configProps.put(KafkaConfig.PasswordEncoderSecretProp, "broker.secret")
diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
index 0ee8d81..927dd1c 100755
--- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
@@ -792,6 +792,8 @@ class KafkaConfigTest {
assertFalse(isValidKafkaConfig(props))
props.put(KafkaConfig.MaxConnectionsPerIpOverridesProp, "127.0.0.1:100")
assertTrue(isValidKafkaConfig(props))
+ props.put(KafkaConfig.MaxConnectionsPerIpOverridesProp, "127.0.0.0#:100")
+ assertFalse(isValidKafkaConfig(props))
}
private def assertPropertyInvalid(validRequiredProps: => Properties, name:
String, values: Any*) {
diff --git a/docs/configuration.html b/docs/configuration.html
index cc3b42c..e5576f9 100644
--- a/docs/configuration.html
+++ b/docs/configuration.html
@@ -203,6 +203,14 @@
<li><code>background.threads</code></li>
</ul>
+ <h5>Updating ConnectionQuota Configs</h5>
+ The maximum number of connections allowed for a given IP/host by the broker
may be updated dynamically at cluster-default level used by all brokers.
+ The changes will apply for new connection creations and the existing
connections count will be taken into account by the new limits.
+ <ul>
+ <li><code>max.connections.per.ip</code></li>
+ <li><code>max.connections.per.ip.overrides</code></li>
+ </ul>
+
<h5>Adding and Removing Listeners</h5>
<p>Listeners may be added or removed dynamically. When a new listener is
added, security configs of the listener must be provided
as listener configs with the listener prefix
<code>listener.name.{listenerName}.</code>. If the new listener uses SASL,