Repository: kafka Updated Branches: refs/heads/trunk cd427c9b9 -> 5b375d7bf
http://git-wip-us.apache.org/repos/asf/kafka/blob/5b375d7b/core/src/main/scala/kafka/server/KafkaServer.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index f998d82..36b52fd 100755 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -323,6 +323,13 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime, threadNamePr def networkClientControlledShutdown(retries: Int): Boolean = { val metadataUpdater = new ManualMetadataUpdater() val networkClient = { + val channelBuilder = ChannelBuilders.create( + config.interBrokerSecurityProtocol, + Mode.CLIENT, + LoginType.SERVER, + config.values, + config.saslMechanismInterBrokerProtocol, + config.saslInterBrokerHandshakeRequestEnable) val selector = new Selector( NetworkReceive.UNLIMITED, config.connectionsMaxIdleMs, @@ -331,7 +338,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime, threadNamePr "kafka-server-controlled-shutdown", Map.empty.asJava, false, - ChannelBuilders.create(config.interBrokerSecurityProtocol, Mode.CLIENT, LoginType.SERVER, config.values) + channelBuilder ) new NetworkClient( selector, http://git-wip-us.apache.org/repos/asf/kafka/blob/5b375d7b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala index 26838ca..84f2e12 100644 --- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala +++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala @@ -73,6 +73,14 @@ class ReplicaFetcherThread(name: String, // as the metrics tag to avoid metric name conflicts with // more than one fetcher thread to the same broker private val networkClient = { + val channelBuilder = ChannelBuilders.create( + brokerConfig.interBrokerSecurityProtocol, + Mode.CLIENT, + LoginType.SERVER, + brokerConfig.values, + brokerConfig.saslMechanismInterBrokerProtocol, + brokerConfig.saslInterBrokerHandshakeRequestEnable + ) val selector = new Selector( NetworkReceive.UNLIMITED, brokerConfig.connectionsMaxIdleMs, @@ -81,7 +89,7 @@ class ReplicaFetcherThread(name: String, "replica-fetcher", Map("broker-id" -> sourceBroker.id.toString, "fetcher-id" -> fetcherId.toString).asJava, false, - ChannelBuilders.create(brokerConfig.interBrokerSecurityProtocol, Mode.CLIENT, LoginType.SERVER, brokerConfig.values) + channelBuilder ) new NetworkClient( selector, http://git-wip-us.apache.org/repos/asf/kafka/blob/5b375d7b/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala b/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala index 56dae76..23fcfa6 100644 --- a/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala +++ b/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala @@ -26,6 +26,7 @@ import org.junit.{Before, Test} import scala.collection.JavaConverters._ import scala.collection.mutable.Buffer import org.apache.kafka.common.internals.TopicConstants +import org.apache.kafka.clients.producer.KafkaProducer /** * Integration tests for the new consumer that cover basic usage as well as server failures @@ -268,10 +269,14 @@ abstract class BaseConsumerTest extends IntegrationTestHarness with Logging { } protected def sendRecords(numRecords: Int, tp: TopicPartition) { + sendRecords(this.producers(0), numRecords, tp) + } + + protected def sendRecords(producer: KafkaProducer[Array[Byte], Array[Byte]], numRecords: Int, tp: TopicPartition) { (0 until numRecords).foreach { i => - this.producers(0).send(new ProducerRecord(tp.topic(), tp.partition(), i.toLong, s"key $i".getBytes, s"value $i".getBytes)) + producer.send(new ProducerRecord(tp.topic(), tp.partition(), i.toLong, s"key $i".getBytes, s"value $i".getBytes)) } - this.producers(0).flush() + producer.flush() } protected def consumeAndVerifyRecords(consumer: Consumer[Array[Byte], Array[Byte]], http://git-wip-us.apache.org/repos/asf/kafka/blob/5b375d7b/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala b/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala index 49ce748..15eeb63 100644 --- a/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala +++ b/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala @@ -41,7 +41,7 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness { val numServers = 2 overridingProps.put(KafkaConfig.NumPartitionsProp, 4.toString) TestUtils.createBrokerConfigs(numServers, zkConnect, false, interBrokerSecurityProtocol = Some(securityProtocol), - trustStoreFile = trustStoreFile).map(KafkaConfig.fromProps(_, overridingProps)) + trustStoreFile = trustStoreFile, saslProperties = saslProperties).map(KafkaConfig.fromProps(_, overridingProps)) } private var consumer1: SimpleConsumer = null @@ -72,7 +72,7 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness { private def createProducer(brokerList: String, retries: Int = 0, lingerMs: Long = 0, props: Option[Properties] = None): KafkaProducer[Array[Byte],Array[Byte]] = { val producer = TestUtils.createNewProducer(brokerList, securityProtocol = securityProtocol, trustStoreFile = trustStoreFile, - retries = retries, lingerMs = lingerMs, props = props) + saslProperties = saslProperties, retries = retries, lingerMs = lingerMs, props = props) producers += producer producer } http://git-wip-us.apache.org/repos/asf/kafka/blob/5b375d7b/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala b/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala index e2314b3..870caca 100644 --- a/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala +++ b/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala @@ -141,9 +141,9 @@ trait EndToEndAuthorizationTest extends IntegrationTestHarness with SaslSetup { override def setUp { securityProtocol match { case SecurityProtocol.SSL => - startSasl(ZkSasl) + startSasl(ZkSasl, null, null) case _ => - startSasl(Both) + startSasl(Both, List("GSSAPI"), List("GSSAPI")) } super.setUp AclCommand.main(topicBrokerReadAclArgs) http://git-wip-us.apache.org/repos/asf/kafka/blob/5b375d7b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala index d0680b8..de05c9c 100644 --- a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala +++ b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala @@ -46,15 +46,15 @@ trait IntegrationTestHarness extends KafkaServerTestHarness { override def generateConfigs() = { val cfgs = TestUtils.createBrokerConfigs(serverCount, zkConnect, interBrokerSecurityProtocol = Some(securityProtocol), - trustStoreFile = trustStoreFile) + trustStoreFile = trustStoreFile, saslProperties = saslProperties) cfgs.foreach(_.putAll(serverConfig)) cfgs.map(KafkaConfig.fromProps) } @Before override def setUp() { - val producerSecurityProps = TestUtils.producerSecurityConfigs(securityProtocol, trustStoreFile) - val consumerSecurityProps = TestUtils.consumerSecurityConfigs(securityProtocol, trustStoreFile) + val producerSecurityProps = TestUtils.producerSecurityConfigs(securityProtocol, trustStoreFile, saslProperties) + val consumerSecurityProps = TestUtils.consumerSecurityConfigs(securityProtocol, trustStoreFile, saslProperties) super.setUp() producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[org.apache.kafka.common.serialization.ByteArraySerializer]) producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[org.apache.kafka.common.serialization.ByteArraySerializer]) @@ -66,11 +66,13 @@ trait IntegrationTestHarness extends KafkaServerTestHarness { producers += TestUtils.createNewProducer(brokerList, securityProtocol = this.securityProtocol, trustStoreFile = this.trustStoreFile, + saslProperties = this.saslProperties, props = Some(producerConfig)) for (i <- 0 until consumerCount) { consumers += TestUtils.createNewConsumer(brokerList, securityProtocol = this.securityProtocol, trustStoreFile = this.trustStoreFile, + saslProperties = this.saslProperties, props = Some(consumerConfig)) } http://git-wip-us.apache.org/repos/asf/kafka/blob/5b375d7b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala index 8dbb80b..b22ccde 100644 --- a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala +++ b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala @@ -355,7 +355,7 @@ class PlaintextConsumerTest extends BaseConsumerTest { producerProps.setProperty(ProducerConfig.COMPRESSION_TYPE_CONFIG, CompressionType.GZIP.name) producerProps.setProperty(ProducerConfig.LINGER_MS_CONFIG, Long.MaxValue.toString) val producer = TestUtils.createNewProducer(brokerList, securityProtocol = securityProtocol, trustStoreFile = trustStoreFile, - retries = 0, lingerMs = Long.MaxValue, props = Some(producerProps)) + saslProperties = saslProperties, retries = 0, lingerMs = Long.MaxValue, props = Some(producerProps)) (0 until numRecords).foreach { i => producer.send(new ProducerRecord(tp.topic, tp.partition, i.toLong, s"key $i".getBytes, s"value $i".getBytes)) } http://git-wip-us.apache.org/repos/asf/kafka/blob/5b375d7b/core/src/test/scala/integration/kafka/api/SaslMultiMechanismConsumerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/SaslMultiMechanismConsumerTest.scala b/core/src/test/scala/integration/kafka/api/SaslMultiMechanismConsumerTest.scala new file mode 100644 index 0000000..d203245 --- /dev/null +++ b/core/src/test/scala/integration/kafka/api/SaslMultiMechanismConsumerTest.scala @@ -0,0 +1,86 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package kafka.api + +import java.io.File +import org.apache.kafka.common.protocol.SecurityProtocol +import kafka.server.KafkaConfig +import org.junit.Test +import kafka.utils.TestUtils +import scala.collection.JavaConverters._ + +class SaslMultiMechanismConsumerTest extends BaseConsumerTest with SaslTestHarness { + override protected val zkSaslEnabled = true + override protected val kafkaClientSaslMechanism = "PLAIN" + override protected val kafkaServerSaslMechanisms = List("GSSAPI", "PLAIN") + override protected def allKafkaClientSaslMechanisms = List("PLAIN", "GSSAPI") + this.serverConfig.setProperty(KafkaConfig.ZkEnableSecureAclsProp, "true") + override protected def securityProtocol = SecurityProtocol.SASL_SSL + override protected lazy val trustStoreFile = Some(File.createTempFile("truststore", ".jks")) + override protected val saslProperties = Some(kafkaSaslProperties(kafkaClientSaslMechanism, kafkaServerSaslMechanisms)) + + @Test + def testMultipleBrokerMechanisms() { + + val plainSaslProducer = producers(0) + val plainSaslConsumer = consumers(0) + + val gssapiSaslProperties = kafkaSaslProperties("GSSAPI", kafkaServerSaslMechanisms) + val gssapiSaslProducer = TestUtils.createNewProducer(brokerList, + securityProtocol = this.securityProtocol, + trustStoreFile = this.trustStoreFile, + saslProperties = Some(gssapiSaslProperties)) + producers += gssapiSaslProducer + val gssapiSaslConsumer = TestUtils.createNewConsumer(brokerList, + securityProtocol = this.securityProtocol, + trustStoreFile = this.trustStoreFile, + saslProperties = Some(gssapiSaslProperties)) + consumers += gssapiSaslConsumer + val numRecords = 1000 + var startingOffset = 0 + + // Test SASL/PLAIN producer and consumer + sendRecords(plainSaslProducer, numRecords, tp) + plainSaslConsumer.assign(List(tp).asJava) + plainSaslConsumer.seek(tp, 0) + consumeAndVerifyRecords(consumer = plainSaslConsumer, numRecords = numRecords, startingOffset = startingOffset) + val plainCommitCallback = new CountConsumerCommitCallback() + plainSaslConsumer.commitAsync(plainCommitCallback) + awaitCommitCallback(plainSaslConsumer, plainCommitCallback) + startingOffset += numRecords + + // Test SASL/GSSAPI producer and consumer + sendRecords(gssapiSaslProducer, numRecords, tp) + gssapiSaslConsumer.assign(List(tp).asJava) + gssapiSaslConsumer.seek(tp, startingOffset) + consumeAndVerifyRecords(consumer = gssapiSaslConsumer, numRecords = numRecords, startingOffset = startingOffset) + val gssapiCommitCallback = new CountConsumerCommitCallback() + gssapiSaslConsumer.commitAsync(gssapiCommitCallback) + awaitCommitCallback(gssapiSaslConsumer, gssapiCommitCallback) + startingOffset += numRecords + + // Test SASL/PLAIN producer and SASL/GSSAPI consumer + sendRecords(plainSaslProducer, numRecords, tp) + gssapiSaslConsumer.assign(List(tp).asJava) + gssapiSaslConsumer.seek(tp, startingOffset) + consumeAndVerifyRecords(consumer = gssapiSaslConsumer, numRecords = numRecords, startingOffset = startingOffset) + startingOffset += numRecords + + // Test SASL/GSSAPI producer and SASL/PLAIN consumer + sendRecords(gssapiSaslProducer, numRecords, tp) + plainSaslConsumer.assign(List(tp).asJava) + plainSaslConsumer.seek(tp, startingOffset) + consumeAndVerifyRecords(consumer = plainSaslConsumer, numRecords = numRecords, startingOffset = startingOffset) + + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/5b375d7b/core/src/test/scala/integration/kafka/api/SaslPlainPlaintextConsumerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/SaslPlainPlaintextConsumerTest.scala b/core/src/test/scala/integration/kafka/api/SaslPlainPlaintextConsumerTest.scala new file mode 100644 index 0000000..687cfc3 --- /dev/null +++ b/core/src/test/scala/integration/kafka/api/SaslPlainPlaintextConsumerTest.scala @@ -0,0 +1,27 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package kafka.api + +import java.io.File +import org.apache.kafka.common.protocol.SecurityProtocol +import kafka.server.KafkaConfig + +class SaslPlainPlaintextConsumerTest extends BaseConsumerTest with SaslTestHarness { + override protected val zkSaslEnabled = true + override protected val kafkaClientSaslMechanism = "PLAIN" + override protected val kafkaServerSaslMechanisms = List(kafkaClientSaslMechanism) + this.serverConfig.setProperty(KafkaConfig.ZkEnableSecureAclsProp, "true") + override protected def securityProtocol = SecurityProtocol.SASL_PLAINTEXT + override protected lazy val trustStoreFile = Some(File.createTempFile("truststore", ".jks")) + override protected val saslProperties = Some(kafkaSaslProperties(kafkaClientSaslMechanism, kafkaServerSaslMechanisms)) +} http://git-wip-us.apache.org/repos/asf/kafka/blob/5b375d7b/core/src/test/scala/integration/kafka/api/SaslSetup.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/SaslSetup.scala b/core/src/test/scala/integration/kafka/api/SaslSetup.scala index 967cae1..acc86e3 100644 --- a/core/src/test/scala/integration/kafka/api/SaslSetup.scala +++ b/core/src/test/scala/integration/kafka/api/SaslSetup.scala @@ -19,11 +19,10 @@ package kafka.api import java.io.File import javax.security.auth.login.Configuration - import kafka.security.minikdc.MiniKdc import kafka.utils.{JaasTestUtils, TestUtils} import org.apache.kafka.common.security.JaasUtils -import org.apache.kafka.common.security.kerberos.LoginManager +import org.apache.kafka.common.security.authenticator.LoginManager /* * Implements an enumeration for the modes enabled here: @@ -40,35 +39,42 @@ case object Both extends SaslSetupMode trait SaslSetup { private val workDir = TestUtils.tempDir() private val kdcConf = MiniKdc.createConfig - private val kdc = new MiniKdc(kdcConf, workDir) + private var kdc: MiniKdc = null - def startSasl(mode: SaslSetupMode = Both) { + def startSasl(mode: SaslSetupMode = Both, kafkaServerSaslMechanisms: List[String], kafkaClientSaslMechanisms: List[String]) { // Important if tests leak consumers, producers or brokers LoginManager.closeAll() - val (serverKeytabFile, clientKeytabFile) = createKeytabsAndSetConfiguration(mode) - kdc.start() - kdc.createPrincipal(serverKeytabFile, "kafka/localhost") - kdc.createPrincipal(clientKeytabFile, "client") + val hasKerberos = mode != ZkSasl && (kafkaClientSaslMechanisms.contains("GSSAPI") || kafkaServerSaslMechanisms.contains("GSSAPI")) + if (hasKerberos) { + val serverKeytabFile = TestUtils.tempFile() + val clientKeytabFile = TestUtils.tempFile() + setJaasConfiguration(mode, kafkaServerSaslMechanisms, kafkaClientSaslMechanisms, Some(serverKeytabFile), Some(clientKeytabFile)) + kdc = new MiniKdc(kdcConf, workDir) + kdc.start() + kdc.createPrincipal(serverKeytabFile, "kafka/localhost") + kdc.createPrincipal(clientKeytabFile, "client") + } else { + setJaasConfiguration(mode, kafkaServerSaslMechanisms, kafkaClientSaslMechanisms) + } if (mode == Both || mode == ZkSasl) System.setProperty("zookeeper.authProvider.1", "org.apache.zookeeper.server.auth.SASLAuthenticationProvider") } - protected def createKeytabsAndSetConfiguration(mode: SaslSetupMode): (File, File) = { - val serverKeytabFile = TestUtils.tempFile() - val clientKeytabFile = TestUtils.tempFile() + protected def setJaasConfiguration(mode: SaslSetupMode, kafkaServerSaslMechanisms: List[String], kafkaClientSaslMechanisms: List[String], + serverKeytabFile: Option[File] = None, clientKeytabFile: Option[File] = None) { val jaasFile = mode match { case ZkSasl => JaasTestUtils.writeZkFile() - case KafkaSasl => JaasTestUtils.writeKafkaFile(serverKeytabFile, clientKeytabFile) - case Both => JaasTestUtils.writeZkAndKafkaFiles(serverKeytabFile, clientKeytabFile) + case KafkaSasl => JaasTestUtils.writeKafkaFile(kafkaServerSaslMechanisms, kafkaClientSaslMechanisms, serverKeytabFile, clientKeytabFile) + case Both => JaasTestUtils.writeZkAndKafkaFiles(kafkaServerSaslMechanisms, kafkaClientSaslMechanisms, serverKeytabFile, clientKeytabFile) } // This will cause a reload of the Configuration singleton when `getConfiguration` is called Configuration.setConfiguration(null) System.setProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM, jaasFile) - (serverKeytabFile, clientKeytabFile) } def closeSasl() { - kdc.stop() + if (kdc != null) + kdc.stop() // Important if tests leak consumers, producers or brokers LoginManager.closeAll() System.clearProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM) http://git-wip-us.apache.org/repos/asf/kafka/blob/5b375d7b/core/src/test/scala/integration/kafka/api/SaslTestHarness.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/SaslTestHarness.scala b/core/src/test/scala/integration/kafka/api/SaslTestHarness.scala index b4ae74f..5531919 100644 --- a/core/src/test/scala/integration/kafka/api/SaslTestHarness.scala +++ b/core/src/test/scala/integration/kafka/api/SaslTestHarness.scala @@ -13,17 +13,27 @@ package kafka.api import kafka.zk.ZooKeeperTestHarness +import kafka.server.KafkaConfig import org.junit.{After, Before} +import java.util.Properties +import scala.collection.JavaConverters._ +import org.apache.kafka.common.config.SaslConfigs trait SaslTestHarness extends ZooKeeperTestHarness with SaslSetup { protected val zkSaslEnabled: Boolean + protected val kafkaClientSaslMechanism = "GSSAPI" + protected val kafkaServerSaslMechanisms = List(kafkaClientSaslMechanism) + + // Override this list to enable client login modules for multiple mechanisms for testing + // of multi-mechanism brokers with clients using different mechanisms in a single JVM + protected def allKafkaClientSaslMechanisms = List(kafkaClientSaslMechanism) @Before override def setUp() { if (zkSaslEnabled) - startSasl(Both) + startSasl(Both, kafkaServerSaslMechanisms, allKafkaClientSaslMechanisms) else - startSasl(KafkaSasl) + startSasl(KafkaSasl, kafkaServerSaslMechanisms, allKafkaClientSaslMechanisms) super.setUp } @@ -33,4 +43,12 @@ trait SaslTestHarness extends ZooKeeperTestHarness with SaslSetup { closeSasl() } + def kafkaSaslProperties(kafkaClientSaslMechanism: String, kafkaServerSaslMechanisms: List[String]) = { + val props = new Properties + props.put(SaslConfigs.SASL_MECHANISM, kafkaClientSaslMechanism) + props.put(KafkaConfig.SaslMechanismInterBrokerProtocolProp, kafkaClientSaslMechanism) + props.put(SaslConfigs.SASL_ENABLED_MECHANISMS, kafkaServerSaslMechanisms.asJava) + props + } + } http://git-wip-us.apache.org/repos/asf/kafka/blob/5b375d7b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala index 2ca64f2..8e8ae8b 100755 --- a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala +++ b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala @@ -19,7 +19,6 @@ package kafka.integration import java.io.File import java.util.Arrays - import kafka.common.KafkaException import kafka.server._ import kafka.utils.{CoreUtils, TestUtils} @@ -27,8 +26,8 @@ import kafka.zk.ZooKeeperTestHarness import org.apache.kafka.common.protocol.SecurityProtocol import org.apache.kafka.common.security.auth.KafkaPrincipal import org.junit.{After, Before} - import scala.collection.mutable.Buffer +import java.util.Properties /** * A test harness that brings up some number of broker nodes @@ -57,6 +56,7 @@ trait KafkaServerTestHarness extends ZooKeeperTestHarness { protected def securityProtocol: SecurityProtocol = SecurityProtocol.PLAINTEXT protected def trustStoreFile: Option[File] = None + protected def saslProperties: Option[Properties] = None @Before override def setUp() { http://git-wip-us.apache.org/repos/asf/kafka/blob/5b375d7b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala index fa240d2..f8476cd 100755 --- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala @@ -550,6 +550,8 @@ class KafkaConfigTest { case KafkaConfig.SslCipherSuitesProp => // ignore string //Sasl Configs + case KafkaConfig.SaslMechanismInterBrokerProtocolProp => // ignore + case KafkaConfig.SaslEnabledMechanismsProp => case KafkaConfig.SaslKerberosServiceNameProp => // ignore string case KafkaConfig.SaslKerberosKinitCmdProp => case KafkaConfig.SaslKerberosTicketRenewWindowFactorProp => http://git-wip-us.apache.org/repos/asf/kafka/blob/5b375d7b/core/src/test/scala/unit/kafka/utils/JaasTestUtils.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/utils/JaasTestUtils.scala b/core/src/test/scala/unit/kafka/utils/JaasTestUtils.scala index a14cd3f..7c4b951 100644 --- a/core/src/test/scala/unit/kafka/utils/JaasTestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/JaasTestUtils.scala @@ -20,16 +20,14 @@ import java.io.{File, BufferedWriter, FileWriter} object JaasTestUtils { - case class Krb5LoginModule(contextName: String, - useKeyTab: Boolean, + case class Krb5LoginModule(useKeyTab: Boolean, storeKey: Boolean, keyTab: String, principal: String, debug: Boolean, serviceName: Option[String]) { - def toJaasSection: JaasSection = { - JaasSection( - contextName, + def toJaasModule: JaasModule = { + JaasModule( "com.sun.security.auth.module.Krb5LoginModule", debug = debug, entries = Map( @@ -42,15 +40,38 @@ object JaasTestUtils { } } - case class JaasSection(contextName: String, - moduleName: String, - debug: Boolean, - entries: Map[String, String]) { + case class PlainLoginModule(username: String, + password: String, + debug: Boolean = false, + validUsers: Map[String, String] = Map.empty) { + def toJaasModule: JaasModule = { + JaasModule( + "org.apache.kafka.common.security.plain.PlainLoginModule", + debug = debug, + entries = Map( + "username" -> username, + "password" -> password + ) ++ validUsers.map { case (user, pass) => (s"user_$user"-> pass)} + ) + } + } + + case class JaasModule(moduleName: String, + debug: Boolean, + entries: Map[String, String]) { override def toString: String = { - s"""|$contextName { - | $moduleName required + s"""$moduleName required | debug=$debug | ${entries.map { case (k, v) => s"""$k="$v"""" }.mkString("", "\n| ", ";")} + |""" + } + } + + class JaasSection(contextName: String, + jaasModule: Seq[JaasModule]) { + override def toString: String = { + s"""|$contextName { + | ${jaasModule.mkString("\n ")} |}; |""".stripMargin } @@ -67,6 +88,11 @@ object JaasTestUtils { private val KafkaServerPrincipal = "kafka/[email protected]" private val KafkaClientContextName = "KafkaClient" private val KafkaClientPrincipal = "[email protected]" + + private val KafkaPlainUser = "testuser" + private val KafkaPlainPassword = "testuser-secret" + private val KafkaPlainAdmin = "admin" + private val KafkaPlainAdminPassword = "admin-secret" def writeZkFile(): String = { val jaasFile = TestUtils.tempFile() @@ -74,43 +100,65 @@ object JaasTestUtils { jaasFile.getCanonicalPath } - def writeKafkaFile(serverKeyTabLocation: File, clientKeyTabLocation: File): String = { + def writeKafkaFile(kafkaServerSaslMechanisms: List[String], kafkaClientSaslMechanisms: List[String], serverKeyTabLocation: Option[File], clientKeyTabLocation: Option[File]): String = { val jaasFile = TestUtils.tempFile() - writeToFile(jaasFile, kafkaSections(serverKeyTabLocation, clientKeyTabLocation)) + val kafkaSections = Seq(kafkaServerSection(kafkaServerSaslMechanisms, serverKeyTabLocation), kafkaClientSection(kafkaClientSaslMechanisms, clientKeyTabLocation)) + writeToFile(jaasFile, kafkaSections) jaasFile.getCanonicalPath } - def writeZkAndKafkaFiles(serverKeyTabLocation: File, clientKeyTabLocation: File): String = { + def writeZkAndKafkaFiles(kafkaServerSaslMechanisms: List[String], kafkaClientSaslMechanisms: List[String], serverKeyTabLocation: Option[File], clientKeyTabLocation: Option[File]): String = { val jaasFile = TestUtils.tempFile() - writeToFile(jaasFile, kafkaSections(serverKeyTabLocation, clientKeyTabLocation) ++ zkSections) + val kafkaSections = Seq(kafkaServerSection(kafkaServerSaslMechanisms, serverKeyTabLocation), kafkaClientSection(kafkaClientSaslMechanisms, clientKeyTabLocation)) + writeToFile(jaasFile, kafkaSections ++ zkSections) jaasFile.getCanonicalPath } private def zkSections: Seq[JaasSection] = Seq( - JaasSection(ZkServerContextName, ZkModule, false, Map("user_super" -> ZkUserSuperPasswd, s"user_$ZkUser" -> ZkUserPassword)), - JaasSection(ZkClientContextName, ZkModule, false, Map("username" -> ZkUser, "password" -> ZkUserPassword)) + new JaasSection(ZkServerContextName, Seq(JaasModule(ZkModule, false, Map("user_super" -> ZkUserSuperPasswd, s"user_$ZkUser" -> ZkUserPassword)))), + new JaasSection(ZkClientContextName, Seq(JaasModule(ZkModule, false, Map("username" -> ZkUser, "password" -> ZkUserPassword)))) ) - private def kafkaSections(serverKeytabLocation: File, clientKeytabLocation: File): Seq[JaasSection] = { - Seq( - Krb5LoginModule( - KafkaServerContextName, - useKeyTab = true, - storeKey = true, - keyTab = serverKeytabLocation.getAbsolutePath, - principal = KafkaServerPrincipal, - debug = true, - serviceName = Some("kafka")), - Krb5LoginModule( - KafkaClientContextName, - useKeyTab = true, - storeKey = true, - keyTab = clientKeytabLocation.getAbsolutePath, - principal = KafkaClientPrincipal, - debug = true, - serviceName = Some("kafka") - ) - ).map(_.toJaasSection) + private def kafkaServerSection(mechanisms: List[String], keytabLocation: Option[File]): JaasSection = { + val modules = mechanisms.map { + case "GSSAPI" => + Krb5LoginModule( + useKeyTab = true, + storeKey = true, + keyTab = keytabLocation.getOrElse(throw new IllegalArgumentException("Keytab location not specified for GSSAPI")).getAbsolutePath, + principal = KafkaServerPrincipal, + debug = true, + serviceName = Some("kafka")).toJaasModule + case "PLAIN" => + PlainLoginModule( + KafkaPlainAdmin, + KafkaPlainAdminPassword, + debug = false, + Map(KafkaPlainAdmin -> KafkaPlainAdminPassword, KafkaPlainUser -> KafkaPlainPassword)).toJaasModule + case mechanism => throw new IllegalArgumentException("Unsupported server mechanism " + mechanism) + } + new JaasSection(KafkaServerContextName, modules) + } + + private def kafkaClientSection(mechanisms: List[String], keytabLocation: Option[File]): JaasSection = { + val modules = mechanisms.map { + case "GSSAPI" => + Krb5LoginModule( + useKeyTab = true, + storeKey = true, + keyTab = keytabLocation.getOrElse(throw new IllegalArgumentException("Keytab location not specified for GSSAPI")).getAbsolutePath, + principal = KafkaClientPrincipal, + debug = true, + serviceName = Some("kafka") + ).toJaasModule + case "PLAIN" => + PlainLoginModule( + KafkaPlainUser, + KafkaPlainPassword + ).toJaasModule + case mechanism => throw new IllegalArgumentException("Unsupported client mechanism " + mechanism) + } + new JaasSection(KafkaClientContextName, modules) } private def jaasSectionsToString(jaasSections: Seq[JaasSection]): String = http://git-wip-us.apache.org/repos/asf/kafka/blob/5b375d7b/core/src/test/scala/unit/kafka/utils/TestUtils.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index 6bd6c63..7df87fc 100755 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -26,12 +26,10 @@ import java.util.{Collections, Properties, Random} import java.security.cert.X509Certificate import javax.net.ssl.X509TrustManager import charset.Charset - import kafka.security.auth.{Acl, Authorizer, Resource} import org.apache.kafka.common.protocol.SecurityProtocol import org.apache.kafka.common.utils.Utils._ import org.apache.kafka.test.TestSslUtils - import scala.collection.mutable.{ArrayBuffer, ListBuffer} import kafka.server._ import kafka.producer._ @@ -146,6 +144,7 @@ object TestUtils extends Logging { enableDeleteTopic: Boolean = false, interBrokerSecurityProtocol: Option[SecurityProtocol] = None, trustStoreFile: Option[File] = None, + saslProperties: Option[Properties] = None, enablePlaintext: Boolean = true, enableSsl: Boolean = false, enableSaslPlaintext: Boolean = false, @@ -153,7 +152,7 @@ object TestUtils extends Logging { rackInfo: Map[Int, String] = Map()): Seq[Properties] = { (0 until numConfigs).map { node => createBrokerConfig(node, zkConnect, enableControlledShutdown, enableDeleteTopic, RandomPort, - interBrokerSecurityProtocol, trustStoreFile, enablePlaintext = enablePlaintext, enableSsl = enableSsl, + interBrokerSecurityProtocol, trustStoreFile, saslProperties, enablePlaintext = enablePlaintext, enableSsl = enableSsl, enableSaslPlaintext = enableSaslPlaintext, enableSaslSsl = enableSaslSsl, rack = rackInfo.get(node)) } } @@ -173,6 +172,7 @@ object TestUtils extends Logging { port: Int = RandomPort, interBrokerSecurityProtocol: Option[SecurityProtocol] = None, trustStoreFile: Option[File] = None, + saslProperties: Option[Properties] = None, enablePlaintext: Boolean = true, enableSaslPlaintext: Boolean = false, saslPlaintextPort: Int = RandomPort, enableSsl: Boolean = false, sslPort: Int = RandomPort, @@ -211,6 +211,9 @@ object TestUtils extends Logging { if (protocolAndPorts.exists { case (protocol, _) => usesSslTransportLayer(protocol) }) props.putAll(sslConfigs(Mode.SERVER, false, trustStoreFile, s"server$nodeId")) + if (protocolAndPorts.exists { case (protocol, _) => usesSaslTransportLayer(protocol) }) + props.putAll(saslConfigs(saslProperties)) + interBrokerSecurityProtocol.foreach { protocol => props.put(KafkaConfig.InterBrokerSecurityProtocolProp, protocol.name) } @@ -440,16 +443,19 @@ object TestUtils extends Logging { private def securityConfigs(mode: Mode, securityProtocol: SecurityProtocol, trustStoreFile: Option[File], - certAlias: String): Properties = { + certAlias: String, + saslProperties: Option[Properties]): Properties = { val props = new Properties if (usesSslTransportLayer(securityProtocol)) props.putAll(sslConfigs(mode, securityProtocol == SecurityProtocol.SSL, trustStoreFile, certAlias)) + if (usesSaslTransportLayer(securityProtocol)) + props.putAll(saslConfigs(saslProperties)) props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, securityProtocol.name) props } - def producerSecurityConfigs(securityProtocol: SecurityProtocol, trustStoreFile: Option[File]): Properties = - securityConfigs(Mode.CLIENT, securityProtocol, trustStoreFile, "producer") + def producerSecurityConfigs(securityProtocol: SecurityProtocol, trustStoreFile: Option[File], saslProperties: Option[Properties]): Properties = + securityConfigs(Mode.CLIENT, securityProtocol, trustStoreFile, "producer", saslProperties) /** * Create a (new) producer with a few pre-configured properties. @@ -463,6 +469,7 @@ object TestUtils extends Logging { requestTimeoutMs: Long = 10 * 1024L, securityProtocol: SecurityProtocol = SecurityProtocol.PLAINTEXT, trustStoreFile: Option[File] = None, + saslProperties: Option[Properties] = None, keySerializer: Serializer[K] = new ByteArraySerializer, valueSerializer: Serializer[V] = new ByteArraySerializer, props: Option[Properties] = None): KafkaProducer[K, V] = { @@ -493,7 +500,7 @@ object TestUtils extends Logging { * SSL client auth fails. */ if (!producerProps.containsKey(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG)) - producerProps.putAll(producerSecurityConfigs(securityProtocol, trustStoreFile)) + producerProps.putAll(producerSecurityConfigs(securityProtocol, trustStoreFile, saslProperties)) new KafkaProducer[K, V](producerProps, keySerializer, valueSerializer) } @@ -503,8 +510,13 @@ object TestUtils extends Logging { case _ => false } - def consumerSecurityConfigs(securityProtocol: SecurityProtocol, trustStoreFile: Option[File]): Properties = - securityConfigs(Mode.CLIENT, securityProtocol, trustStoreFile, "consumer") + private def usesSaslTransportLayer(securityProtocol: SecurityProtocol): Boolean = securityProtocol match { + case SecurityProtocol.SASL_PLAINTEXT | SecurityProtocol.SASL_SSL => true + case _ => false + } + + def consumerSecurityConfigs(securityProtocol: SecurityProtocol, trustStoreFile: Option[File], saslProperties: Option[Properties]): Properties = + securityConfigs(Mode.CLIENT, securityProtocol, trustStoreFile, "consumer", saslProperties) /** * Create a new consumer with a few pre-configured properties. @@ -517,6 +529,7 @@ object TestUtils extends Logging { sessionTimeout: Int = 30000, securityProtocol: SecurityProtocol, trustStoreFile: Option[File] = None, + saslProperties: Option[Properties] = None, props: Option[Properties] = None) : KafkaConsumer[Array[Byte],Array[Byte]] = { import org.apache.kafka.clients.consumer.ConsumerConfig @@ -545,7 +558,7 @@ object TestUtils extends Logging { * SSL client auth fails. */ if(!consumerProps.containsKey(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG)) - consumerProps.putAll(consumerSecurityConfigs(securityProtocol, trustStoreFile)) + consumerProps.putAll(consumerSecurityConfigs(securityProtocol, trustStoreFile, saslProperties)) new KafkaConsumer[Array[Byte],Array[Byte]](consumerProps) } @@ -1058,6 +1071,13 @@ object TestUtils extends Logging { sslProps } + def saslConfigs(saslProperties: Option[Properties]): Properties = { + saslProperties match { + case Some(properties) => properties + case None => new Properties + } + } + // a X509TrustManager to trust self-signed certs for unit tests. def trustAllCerts: X509TrustManager = { val trustManager = new X509TrustManager() {
