This is an automated email from the ASF dual-hosted git repository.
cmccabe pushed a commit to branch 2.8
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.8 by this push:
new b419d1c MINOR: KIP-631 KafkaConfig fixes and improvements (#10114)
b419d1c is described below
commit b419d1cf48f7e3b122cbbf963f7827c71e0c11de
Author: Colin Patrick McCabe <[email protected]>
AuthorDate: Thu Feb 11 21:35:24 2021 -0800
MINOR: KIP-631 KafkaConfig fixes and improvements (#10114)
Add the new KIP-631 configs to KafkaConfigTest to fix the test failure.
Rename InitialBrokerRegistrationTimeoutMs to
InitialBrokerRegistrationTimeoutMsProp for consistency with the other
properties.
Add ControllerListenerNamesProp as specified in KIP-631.
Give nodeId and brokerId the same value in KafkaConfig.
Reviewers: David Arthur <[email protected]
---
core/src/main/scala/kafka/server/KafkaConfig.scala | 59 ++++++++++++------
.../kafka/server/BrokerLifecycleManagerTest.scala | 2 +-
.../scala/unit/kafka/server/KafkaConfigTest.scala | 71 ++++++++++++++++++++++
3 files changed, 114 insertions(+), 18 deletions(-)
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala
b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 0ecb48c..2fd04ae 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -372,11 +372,12 @@ object KafkaConfig {
/** KIP-500 Configuration */
val ProcessRolesProp = "process.roles"
- val InitialBrokerRegistrationTimeoutMs =
"initial.broker.registration.timeout.ms"
+ val InitialBrokerRegistrationTimeoutMsProp =
"initial.broker.registration.timeout.ms"
val BrokerHeartbeatIntervalMsProp = "broker.heartbeat.interval.ms"
val BrokerSessionTimeoutMsProp = "broker.session.timeout.ms"
val NodeIdProp = "node.id"
val MetadataLogDirProp = "metadata.log.dir"
+ val ControllerListenerNamesProp = "controller.listener.names"
/************* Authorizer Configuration ***********/
val AuthorizerClassNameProp = "authorizer.class.name"
@@ -672,6 +673,8 @@ object KafkaConfig {
"This is required configuration when the self-managed quorum is enabled."
val MetadataLogDirDoc = "This configuration determines where we put the
metadata log for clusters upgraded to " +
"KIP-500. If it is not set, the metadata log is placed in the first log
directory from log.dirs."
+ val ControllerListenerNamesDoc = "A comma-separated list of the names of the
listeners used by the KIP-500 controller. This is required " +
+ "if this process is a KIP-500 controller. The ZK-based controller will not
use this configuration."
/************* Authorizer Configuration ***********/
val AuthorizerClassNameDoc = s"The fully qualified name of a class that
implements s${classOf[Authorizer].getName}" +
@@ -1073,10 +1076,11 @@ object KafkaConfig {
*/
.defineInternal(ProcessRolesProp, LIST, Collections.emptyList(),
ValidList.in("broker", "controller"), HIGH, ProcessRolesDoc)
.defineInternal(NodeIdProp, INT, Defaults.EmptyNodeId, null, HIGH,
NodeIdDoc)
- .defineInternal(InitialBrokerRegistrationTimeoutMs, INT,
Defaults.InitialBrokerRegistrationTimeoutMs, null, MEDIUM,
InitialBrokerRegistrationTimeoutMsDoc)
+ .defineInternal(InitialBrokerRegistrationTimeoutMsProp, INT,
Defaults.InitialBrokerRegistrationTimeoutMs, null, MEDIUM,
InitialBrokerRegistrationTimeoutMsDoc)
.defineInternal(BrokerHeartbeatIntervalMsProp, INT,
Defaults.BrokerHeartbeatIntervalMs, null, MEDIUM, BrokerHeartbeatIntervalMsDoc)
.defineInternal(BrokerSessionTimeoutMsProp, INT,
Defaults.BrokerSessionTimeoutMs, null, MEDIUM, BrokerSessionTimeoutMsDoc)
.defineInternal(MetadataLogDirProp, STRING, null, null, HIGH,
MetadataLogDirDoc)
+ .defineInternal(ControllerListenerNamesProp, STRING, null, null, HIGH,
ControllerListenerNamesDoc)
/************* Authorizer Configuration ***********/
.define(AuthorizerClassNameProp, STRING, Defaults.AuthorizerClassName,
LOW, AuthorizerClassNameDoc)
@@ -1506,10 +1510,17 @@ class KafkaConfig(val props: java.util.Map[_, _],
doLog: Boolean, dynamicConfigO
/** ********* General Configuration ***********/
val brokerIdGenerationEnable: Boolean =
getBoolean(KafkaConfig.BrokerIdGenerationEnableProp)
val maxReservedBrokerId: Int = getInt(KafkaConfig.MaxReservedBrokerIdProp)
- var brokerId: Int = getInt(KafkaConfig.BrokerIdProp)
- val nodeId: Int = getInt(KafkaConfig.NodeIdProp)
+ var brokerId: Int = {
+ val nodeId = getInt(KafkaConfig.NodeIdProp)
+ if (nodeId < 0) {
+ getInt(KafkaConfig.BrokerIdProp)
+ } else {
+ nodeId
+ }
+ }
+ val nodeId: Int = brokerId
val processRoles: Set[ProcessRole] = parseProcessRoles()
- val initialRegistrationTimeoutMs: Int =
getInt(KafkaConfig.InitialBrokerRegistrationTimeoutMs)
+ val initialRegistrationTimeoutMs: Int =
getInt(KafkaConfig.InitialBrokerRegistrationTimeoutMsProp)
val brokerHeartbeatIntervalMs: Int =
getInt(KafkaConfig.BrokerHeartbeatIntervalMsProp)
val brokerSessionTimeoutMs: Int =
getInt(KafkaConfig.BrokerSessionTimeoutMsProp)
@@ -1797,6 +1808,12 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog:
Boolean, dynamicConfigO
}.getOrElse(CoreUtils.listenerListToEndPoints("PLAINTEXT://" + hostName +
":" + port, listenerSecurityProtocolMap))
}
+ def controllerListenerNames: Seq[String] =
+
Option(getString(KafkaConfig.ControllerListenerNamesProp)).getOrElse("").split(",")
+
+ def controllerListeners: Seq[EndPoint] =
+ listeners.filter(l =>
controllerListenerNames.contains(l.listenerName.value()))
+
def controlPlaneListener: Option[EndPoint] = {
controlPlaneListenerName.map { listenerName =>
listeners.filter(endpoint => endpoint.listenerName.value() ==
listenerName.value()).head
@@ -1804,9 +1821,10 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog:
Boolean, dynamicConfigO
}
def dataPlaneListeners: Seq[EndPoint] = {
- Option(getString(KafkaConfig.ControlPlaneListenerNameProp)) match {
- case Some(controlPlaneListenerName) =>
listeners.filterNot(_.listenerName.value() == controlPlaneListenerName)
- case None => listeners
+ listeners.filterNot { listener =>
+ val name = listener.listenerName.value()
+ name.equals(getString(KafkaConfig.ControlPlaneListenerNameProp)) ||
+ controllerListenerNames.contains(name)
}
}
@@ -1820,7 +1838,7 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog:
Boolean, dynamicConfigO
else if (getString(KafkaConfig.AdvertisedHostNameProp) != null ||
getInt(KafkaConfig.AdvertisedPortProp) != null)
CoreUtils.listenerListToEndPoints("PLAINTEXT://" + advertisedHostName +
":" + advertisedPort, listenerSecurityProtocolMap, requireDistinctPorts=false)
else
- listeners
+ listeners.filterNot(l =>
controllerListenerNames.contains(l.listenerName.value()))
}
private def getInterBrokerListenerNameAndSecurityProtocol: (ListenerName,
SecurityProtocol) = {
@@ -1896,18 +1914,25 @@ class KafkaConfig(val props: java.util.Map[_, _],
doLog: Boolean, dynamicConfigO
val advertisedListenerNames = advertisedListeners.map(_.listenerName).toSet
val listenerNames = listeners.map(_.listenerName).toSet
- require(advertisedListenerNames.contains(interBrokerListenerName),
- s"${KafkaConfig.InterBrokerListenerNameProp} must be a listener name
defined in ${KafkaConfig.AdvertisedListenersProp}. " +
- s"The valid options based on currently configured listeners are
${advertisedListenerNames.map(_.value).mkString(",")}")
- require(advertisedListenerNames.subsetOf(listenerNames),
- s"${KafkaConfig.AdvertisedListenersProp} listener names must be equal to
or a subset of the ones defined in ${KafkaConfig.ListenersProp}. " +
- s"Found ${advertisedListenerNames.map(_.value).mkString(",")}. The valid
options based on the current configuration " +
- s"are ${listenerNames.map(_.value).mkString(",")}"
- )
+ if (processRoles.isEmpty || processRoles.contains(BrokerRole)) {
+ require(advertisedListenerNames.contains(interBrokerListenerName),
+ s"${KafkaConfig.InterBrokerListenerNameProp} must be a listener name
defined in ${KafkaConfig.AdvertisedListenersProp}. " +
+ s"The valid options based on currently configured listeners are
${advertisedListenerNames.map(_.value).mkString(",")}")
+ require(advertisedListenerNames.subsetOf(listenerNames),
+ s"${KafkaConfig.AdvertisedListenersProp} listener names must be equal
to or a subset of the ones defined in ${KafkaConfig.ListenersProp}. " +
+ s"Found ${advertisedListenerNames.map(_.value).mkString(",")}. The
valid options based on the current configuration " +
+ s"are ${listenerNames.map(_.value).mkString(",")}"
+ )
+ }
+
require(!advertisedListeners.exists(endpoint => endpoint.host=="0.0.0.0"),
s"${KafkaConfig.AdvertisedListenersProp} cannot use the nonroutable
meta-address 0.0.0.0. "+
s"Use a routable IP address.")
+ // Ensure controller listeners are not in the advertised listeners list
+ require(!controllerListeners.exists(advertisedListeners.contains),
+ s"${KafkaConfig.AdvertisedListenersProp} cannot contain any of
${KafkaConfig.ControllerListenerNamesProp}")
+
// validate controller.listener.name config
if (controlPlaneListenerName.isDefined) {
require(advertisedListenerNames.contains(controlPlaneListenerName.get),
diff --git
a/core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala
b/core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala
index a823ce6..7544a46 100644
--- a/core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala
@@ -46,7 +46,7 @@ class BrokerLifecycleManagerTest {
properties.setProperty(KafkaConfig.LogDirsProp, "/tmp/foo")
properties.setProperty(KafkaConfig.ProcessRolesProp, "broker")
properties.setProperty(KafkaConfig.NodeIdProp, "1")
- properties.setProperty(KafkaConfig.InitialBrokerRegistrationTimeoutMs,
"300000")
+ properties.setProperty(KafkaConfig.InitialBrokerRegistrationTimeoutMsProp,
"300000")
properties
}
diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
index 427bc13..d6c456b 100755
--- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
@@ -262,6 +262,30 @@ class KafkaConfigTest {
}
@Test
+ def testControllerListenerName() = {
+ val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect)
+ props.put(KafkaConfig.ListenersProp,
"PLAINTEXT://localhost:0,CONTROLPLANE://localhost:4000,CONTROLLER://localhost:5000")
+ props.put(KafkaConfig.ListenerSecurityProtocolMapProp,
"PLAINTEXT:PLAINTEXT,CONTROLPLANE:SSL,CONTROLLER:SASL_SSL")
+ props.put(KafkaConfig.AdvertisedListenersProp,
"PLAINTEXT://localhost:0,CONTROLPLANE://localhost:4000")
+ props.put(KafkaConfig.ControlPlaneListenerNameProp, "CONTROLPLANE")
+ props.put(KafkaConfig.ControllerListenerNamesProp, "CONTROLLER")
+ assertTrue(isValidKafkaConfig(props))
+
+ val serverConfig = KafkaConfig.fromProps(props)
+ val controlPlaneEndpoint = serverConfig.controlPlaneListener.get
+ assertEquals("localhost", controlPlaneEndpoint.host)
+ assertEquals(4000, controlPlaneEndpoint.port)
+ assertEquals(SecurityProtocol.SSL, controlPlaneEndpoint.securityProtocol)
+
+ val controllerEndpoints = serverConfig.controllerListeners
+ assertEquals(1, controllerEndpoints.size)
+ val controllerEndpoint = controllerEndpoints.iterator.next()
+ assertEquals("localhost", controllerEndpoint.host)
+ assertEquals(5000, controllerEndpoint.port)
+ assertEquals(SecurityProtocol.SASL_SSL,
controllerEndpoint.securityProtocol)
+ }
+
+ @Test
def testBadListenerProtocol(): Unit = {
val props = new Properties()
props.put(KafkaConfig.BrokerIdProp, "1")
@@ -619,8 +643,13 @@ class KafkaConfigTest {
case KafkaConfig.ConnectionSetupTimeoutMaxMsProp =>
assertPropertyInvalid(baseProperties, name, "not_a_number")
// KIP-500 Configurations
+ case KafkaConfig.ProcessRolesProp => // ignore
+ case KafkaConfig.InitialBrokerRegistrationTimeoutMsProp =>
assertPropertyInvalid(baseProperties, name, "not_a_number")
+ case KafkaConfig.BrokerHeartbeatIntervalMsProp =>
assertPropertyInvalid(baseProperties, name, "not_a_number")
+ case KafkaConfig.BrokerSessionTimeoutMsProp =>
assertPropertyInvalid(baseProperties, name, "not_a_number")
case KafkaConfig.NodeIdProp => assertPropertyInvalid(baseProperties,
name, "not_a_number")
case KafkaConfig.MetadataLogDirProp => // ignore string
+ case KafkaConfig.ControllerListenerNamesProp => // ignore string
case KafkaConfig.AuthorizerClassNameProp => //ignore string
case KafkaConfig.CreateTopicPolicyClassNameProp => //ignore string
@@ -962,6 +991,48 @@ class KafkaConfigTest {
}
}
+ def assertDistinctControllerAndAdvertisedListeners(): Unit = {
+ val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port
= TestUtils.MockZkPort)
+ val listeners = "PLAINTEXT://A:9092,SSL://B:9093,SASL_SSL://C:9094"
+ props.put(KafkaConfig.ListenersProp, listeners)
+ props.put(KafkaConfig.AdvertisedListenersProp,
"PLAINTEXT://A:9092,SSL://B:9093")
+ // Valid now
+ assertTrue(isValidKafkaConfig(props))
+
+ // Still valid
+ val controllerListeners = "SASL_SSL"
+ props.put(KafkaConfig.ControllerListenerNamesProp, controllerListeners)
+ assertTrue(isValidKafkaConfig(props))
+ }
+
+ @Test
+ def assertAllControllerListenerCannotBeAdvertised(): Unit = {
+ val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port
= TestUtils.MockZkPort)
+ val listeners = "PLAINTEXT://A:9092,SSL://B:9093,SASL_SSL://C:9094"
+ props.put(KafkaConfig.ListenersProp, listeners)
+ props.put(KafkaConfig.AdvertisedListenersProp, listeners)
+ // Valid now
+ assertTrue(isValidKafkaConfig(props))
+
+ // Invalid now
+ props.put(KafkaConfig.ControllerListenerNamesProp,
"PLAINTEXT,SSL,SASL_SSL")
+ assertFalse(isValidKafkaConfig(props))
+ }
+
+ @Test
+ def assertEvenOneControllerListenerCannotBeAdvertised(): Unit = {
+ val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port
= TestUtils.MockZkPort)
+ val listeners = "PLAINTEXT://A:9092,SSL://B:9093,SASL_SSL://C:9094"
+ props.put(KafkaConfig.ListenersProp, listeners)
+ props.put(KafkaConfig.AdvertisedListenersProp, listeners)
+ // Valid now
+ assertTrue(isValidKafkaConfig(props))
+
+ // Invalid now
+ props.put(KafkaConfig.ControllerListenerNamesProp, "PLAINTEXT")
+ assertFalse(isValidKafkaConfig(props))
+ }
+
@Test
def testInvalidQuorumVotersConfig(): Unit = {
assertInvalidQuorumVoters("1")