Repository: kafka Updated Branches: refs/heads/trunk 79eacf6c9 -> 5b5869383
KAFKA-3194: Validate security.inter.broker.protocol against the adver⦠â¦tised.listeners protocols Author: Grant Henke <[email protected]> Reviewers: Ismael Juma Closes #851 from granthenke/verify-protocol Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/5b586938 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/5b586938 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/5b586938 Branch: refs/heads/trunk Commit: 5b5869383832512d78e20183c855f83d30a5ab37 Parents: 79eacf6 Author: Grant Henke <[email protected]> Authored: Wed Feb 3 15:42:22 2016 -0800 Committer: Gwen Shapira <[email protected]> Committed: Wed Feb 3 15:42:22 2016 -0800 ---------------------------------------------------------------------- .../main/scala/kafka/server/KafkaConfig.scala | 7 +++++ .../unit/kafka/network/SocketServerTest.scala | 4 +-- .../unit/kafka/server/KafkaConfigTest.scala | 28 ++++++++++++++++++++ 3 files changed, 37 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/5b586938/core/src/main/scala/kafka/server/KafkaConfig.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 00bf0cb..2c6311c 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -951,6 +951,13 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean) extends Abstra "offsets.commit.required.acks must be greater or equal -1 and less or equal to offsets.topic.replication.factor") require(BrokerCompressionCodec.isValid(compressionType), "compression.type : " + compressionType + " is not valid." + " Valid options are " + BrokerCompressionCodec.brokerCompressionOptions.mkString(",")) + require(advertisedListeners.keySet.contains(interBrokerSecurityProtocol), + s"${KafkaConfig.InterBrokerSecurityProtocolProp} must be a protocol in the configured set of ${KafkaConfig.AdvertisedListenersProp}. " + + s"The valid options based on currently configured protocols are ${advertisedListeners.keySet}") + require(advertisedListeners.keySet.subsetOf(listeners.keySet), + s"${KafkaConfig.AdvertisedListenersProp} protocols must be equal to or a subset of ${KafkaConfig.ListenersProp} protocols. " + + s"Found ${advertisedListeners.keySet}. The valid options based on currently configured protocols are ${listeners.keySet}" + ) } } http://git-wip-us.apache.org/repos/asf/kafka/blob/5b586938/core/src/test/scala/unit/kafka/network/SocketServerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala index b4ba027..d94c314 100644 --- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala +++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala @@ -233,9 +233,9 @@ class SocketServerTest extends JUnitSuite { @Test def testSslSocketServer(): Unit = { val trustStoreFile = File.createTempFile("truststore", ".jks") - val overrideProps = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, enableSsl = true, + val overrideProps = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, interBrokerSecurityProtocol = Some(SecurityProtocol.SSL), trustStoreFile = Some(trustStoreFile)) - overrideProps.put("listeners", "SSL://localhost:0") + overrideProps.put(KafkaConfig.ListenersProp, "SSL://localhost:0") val serverMetrics = new Metrics val overrideServer: SocketServer = new SocketServer(KafkaConfig.fromProps(overrideProps), serverMetrics, new SystemTime) http://git-wip-us.apache.org/repos/asf/kafka/blob/5b586938/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala index e8ffb5b..2479b37 100755 --- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala @@ -393,6 +393,34 @@ class KafkaConfigTest { } @Test + def testInvalidInterBrokerSecurityProtocol() { + val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181) + props.put(KafkaConfig.ListenersProp, "SSL://localhost:0") + props.put(KafkaConfig.InterBrokerSecurityProtocolProp, SecurityProtocol.PLAINTEXT.toString) + intercept[IllegalArgumentException] { + KafkaConfig.fromProps(props) + } + } + + @Test + def testEqualAdvertisedListenersProtocol() { + val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181) + props.put(KafkaConfig.ListenersProp, "PLAINTEXT://localhost:9092,SSL://localhost:9093") + props.put(KafkaConfig.AdvertisedListenersProp, "PLAINTEXT://localhost:9092,SSL://localhost:9093") + KafkaConfig.fromProps(props) + } + + @Test + def testInvalidAdvertisedListenersProtocol() { + val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181) + props.put(KafkaConfig.ListenersProp, "TRACE://localhost:9091,SSL://localhost:9093") + props.put(KafkaConfig.AdvertisedListenersProp, "PLAINTEXT://localhost:9092") + intercept[IllegalArgumentException] { + KafkaConfig.fromProps(props) + } + } + + @Test def testFromPropsInvalid() { def getBaseProperties(): Properties = { val validRequiredProperties = new Properties()
