Repository: kafka Updated Branches: refs/heads/0.9.0 d0e864795 -> 29ebb42fd
KAFKA-3194: Validate security.inter.broker.protocol against the adver⦠â¦tised.listeners protocols Author: Grant Henke <[email protected]> Reviewers: Ismael Juma Closes #851 from granthenke/verify-protocol Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/29ebb42f Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/29ebb42f Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/29ebb42f Branch: refs/heads/0.9.0 Commit: 29ebb42fdf8af4b5983f3b22f13697e0c73e0d5a Parents: d0e8647 Author: Grant Henke <[email protected]> Authored: Wed Feb 3 15:42:22 2016 -0800 Committer: Gwen Shapira <[email protected]> Committed: Wed Feb 3 15:45:26 2016 -0800 ---------------------------------------------------------------------- .../main/scala/kafka/server/KafkaConfig.scala | 7 +++++ .../unit/kafka/network/SocketServerTest.scala | 4 +-- .../unit/kafka/server/KafkaConfigTest.scala | 28 ++++++++++++++++++++ 3 files changed, 37 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/29ebb42f/core/src/main/scala/kafka/server/KafkaConfig.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 5cc10b4..f7af38c 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -953,6 +953,13 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean) extends Abstra "offsets.commit.required.acks must be greater or equal -1 and less or equal to offsets.topic.replication.factor") require(BrokerCompressionCodec.isValid(compressionType), "compression.type : " + compressionType + " is not valid." + " Valid options are " + BrokerCompressionCodec.brokerCompressionOptions.mkString(",")) + require(advertisedListeners.keySet.contains(interBrokerSecurityProtocol), + s"${KafkaConfig.InterBrokerSecurityProtocolProp} must be a protocol in the configured set of ${KafkaConfig.AdvertisedListenersProp}. " + + s"The valid options based on currently configured protocols are ${advertisedListeners.keySet}") + require(advertisedListeners.keySet.subsetOf(listeners.keySet), + s"${KafkaConfig.AdvertisedListenersProp} protocols must be equal to or a subset of ${KafkaConfig.ListenersProp} protocols. " + + s"Found ${advertisedListeners.keySet}. The valid options based on currently configured protocols are ${listeners.keySet}" + ) } } http://git-wip-us.apache.org/repos/asf/kafka/blob/29ebb42f/core/src/test/scala/unit/kafka/network/SocketServerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala index 01f198e..606d199 100644 --- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala +++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala @@ -221,9 +221,9 @@ class SocketServerTest extends JUnitSuite { @Test def testSslSocketServer(): Unit = { val trustStoreFile = File.createTempFile("truststore", ".jks") - val overrideProps = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, enableSsl = true, + val overrideProps = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, interBrokerSecurityProtocol = Some(SecurityProtocol.SSL), trustStoreFile = Some(trustStoreFile)) - overrideProps.put("listeners", "SSL://localhost:0") + overrideProps.put(KafkaConfig.ListenersProp, "SSL://localhost:0") val serverMetrics = new Metrics val overrideServer: SocketServer = new SocketServer(KafkaConfig.fromProps(overrideProps), serverMetrics, new SystemTime) http://git-wip-us.apache.org/repos/asf/kafka/blob/29ebb42f/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala index 9ddc2c1..ac4d6ca 100755 --- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala @@ -383,6 +383,34 @@ class KafkaConfigTest { } @Test + def testInvalidInterBrokerSecurityProtocol() { + val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181) + props.put(KafkaConfig.ListenersProp, "SSL://localhost:0") + props.put(KafkaConfig.InterBrokerSecurityProtocolProp, SecurityProtocol.PLAINTEXT.toString) + intercept[IllegalArgumentException] { + KafkaConfig.fromProps(props) + } + } + + @Test + def testEqualAdvertisedListenersProtocol() { + val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181) + props.put(KafkaConfig.ListenersProp, "PLAINTEXT://localhost:9092,SSL://localhost:9093") + props.put(KafkaConfig.AdvertisedListenersProp, "PLAINTEXT://localhost:9092,SSL://localhost:9093") + KafkaConfig.fromProps(props) + } + + @Test + def testInvalidAdvertisedListenersProtocol() { + val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181) + props.put(KafkaConfig.ListenersProp, "TRACE://localhost:9091,SSL://localhost:9093") + props.put(KafkaConfig.AdvertisedListenersProp, "PLAINTEXT://localhost:9092") + intercept[IllegalArgumentException] { + KafkaConfig.fromProps(props) + } + } + + @Test def testFromPropsInvalid() { def getBaseProperties(): Properties = { val validRequiredProperties = new Properties()
