Repository: kafka Updated Branches: refs/heads/trunk 1dd558aec -> 4a8acdf9d
KAFKA-2732; Add class for ZK Auth. Author: Flavio Junqueira <f...@apache.org> Reviewers: Ismael Juma <ism...@juma.me.uk>, Ben Stopford <benstopf...@gmail.com>, Jun Rao <jun...@gmail.com> Closes #410 from fpj/KAFKA-2732 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/4a8acdf9 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/4a8acdf9 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/4a8acdf9 Branch: refs/heads/trunk Commit: 4a8acdf9d7135a50b751a0bea9e205ea4a071e5c Parents: 1dd558a Author: Flavio Junqueira <f...@apache.org> Authored: Sat Nov 28 08:23:35 2015 -0800 Committer: Jun Rao <jun...@gmail.com> Committed: Sat Nov 28 08:23:35 2015 -0800 ---------------------------------------------------------------------- .../kafka/api/EndToEndAuthorizationTest.scala | 273 +++++++++++++++++++ .../kafka/api/IntegrationTestHarness.scala | 19 +- .../kafka/api/SaslPlaintextConsumerTest.scala | 1 + .../scala/integration/kafka/api/SaslSetup.scala | 87 ++++++ .../kafka/api/SaslSslConsumerTest.scala | 4 + .../api/SaslSslEndToEndAuthorizationTest.scala | 25 ++ .../integration/kafka/api/SaslTestHarness.scala | 59 +--- .../api/SslEndToEndAuthorizationTest.scala | 29 ++ .../integration/KafkaServerTestHarness.scala | 20 +- .../SaslPlaintextTopicMetadataTest.scala | 1 + .../integration/SaslSslTopicMetadataTest.scala | 1 + .../server/SaslPlaintextReplicaFetchTest.scala | 1 + .../kafka/server/SaslSslReplicaFetchTest.scala | 1 + .../scala/unit/kafka/utils/JaasTestUtils.scala | 65 ++++- .../test/scala/unit/kafka/utils/TestUtils.scala | 59 ++-- 15 files changed, 554 insertions(+), 91 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/4a8acdf9/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala b/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala new file mode 100644 index 0000000..59cff14 --- /dev/null +++ b/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala @@ -0,0 +1,273 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.api + +import java.io.File +import java.util.ArrayList +import java.util.concurrent.ExecutionException + +import kafka.admin.AclCommand +import kafka.common.TopicAndPartition +import kafka.security.auth._ +import kafka.server._ +import kafka.utils._ + +import org.apache.kafka.clients.consumer.{Consumer, ConsumerRecord, ConsumerConfig} +import org.apache.kafka.clients.producer.{ProducerRecord, ProducerConfig} +import org.apache.kafka.common.security.auth.KafkaPrincipal +import org.apache.kafka.common.{TopicPartition} +import org.apache.kafka.common.protocol.SecurityProtocol +import org.apache.kafka.common.errors.{GroupAuthorizationException,TopicAuthorizationException} +import org.junit.Assert._ +import org.junit.{Test, After, Before} + +import scala.collection.JavaConverters._ + + +/** + * The test cases here verify that a producer authorized to publish to a topic + * is able to, and that consumers in a group authorized to consume are able to + * to do so. + * + * This test relies on a chain of test harness traits to set up. It directly + * extends IntegrationTestHarness. IntegrationTestHarness creates producers and + * consumers, and it extends KafkaServerTestHarness. KafkaServerTestHarness starts + * brokers, but first it initializes a ZooKeeper server and client, which happens + * in ZooKeeperTestHarness. + * + * To start brokers when the security protocol is SASL_SSL, we need to set a cluster + * ACL, which happens optionally in KafkaServerTestHarness. If the security protocol + * is SSL or PLAINTEXT, then the ACL isn't set. The remaining ACLs to enable access + * to producers and consumers are set here. To set ACLs, we use AclCommand directly. + * + * Finally, we rely on SaslSetup to bootstrap and setup Kerberos. We don't use + * SaslTestHarness here directly because it extends ZooKeeperTestHarness, and we + * would end up with ZooKeeperTestHarness twice. + */ +trait EndToEndAuthorizationTest extends IntegrationTestHarness with SaslSetup { + override val producerCount = 1 + override val consumerCount = 2 + override val serverCount = 3 + override val setClusterAcl = Some(() => + { AclCommand.main(clusterAclArgs) + TestUtils.waitAndVerifyAcls(ClusterActionAcl, servers.head.apis.authorizer.get, clusterResource) + } : Unit + ) + val numRecords = 1 + val group = "group" + val topic = "e2etopic" + val part = 0 + val tp = new TopicPartition(topic, part) + val topicAndPartition = new TopicAndPartition(topic, part) + val clientPrincipal: String + val kafkaPrincipal: String + + override protected lazy val trustStoreFile = Some(File.createTempFile("truststore", ".jks")) + + val topicResource = new Resource(Topic, topic) + val groupResource = new Resource(Group, group) + val clusterResource = Resource.ClusterResource + + // Arguments to AclCommand to set ACLs. There are three definitions here: + // 1- Provides read and write access to topic + // 2- Provides only write access to topic + // 3- Provides read access to consumer group + def clusterAclArgs: Array[String] = Array("--authorizer-properties", + s"zookeeper.connect=$zkConnect", + s"--add", + s"--cluster", + s"--operation=ClusterAction", + s"--allow-principal=$kafkaPrincipalType:$kafkaPrincipal") + def produceAclArgs: Array[String] = Array("--authorizer-properties", + s"zookeeper.connect=$zkConnect", + s"--add", + s"--topic=$topic", + s"--producer", + s"--allow-principal=$kafkaPrincipalType:$clientPrincipal") + def consumeAclArgs: Array[String] = Array("--authorizer-properties", + s"zookeeper.connect=$zkConnect", + s"--add", + s"--topic=$topic", + s"--group=$group", + s"--consumer", + s"--allow-principal=$kafkaPrincipalType:$clientPrincipal") + def groupAclArgs: Array[String] = Array("--authorizer-properties", + s"zookeeper.connect=$zkConnect", + s"--add", + s"--group=$group", + s"--operation=Read", + s"--allow-principal=$kafkaPrincipalType:$clientPrincipal") + def ClusterActionAcl:Set[Acl] = Set(new Acl(new KafkaPrincipal(kafkaPrincipalType, kafkaPrincipal), Allow, Acl.WildCardHost, ClusterAction)) + def GroupReadAcl = Set(new Acl(new KafkaPrincipal(kafkaPrincipalType, clientPrincipal), Allow, Acl.WildCardHost, Read)) + def TopicReadAcl = Set(new Acl(new KafkaPrincipal(kafkaPrincipalType, clientPrincipal), Allow, Acl.WildCardHost, Read)) + def TopicWriteAcl = Set(new Acl(new KafkaPrincipal(kafkaPrincipalType, clientPrincipal), Allow, Acl.WildCardHost, Write)) + def TopicDescribeAcl = Set(new Acl(new KafkaPrincipal(kafkaPrincipalType, clientPrincipal), Allow, Acl.WildCardHost, Describe)) + // The next two configuration parameters enable ZooKeeper secure ACLs + // and sets the Kafka authorizer, both necessary to enable security. + this.serverConfig.setProperty(KafkaConfig.ZkEnableSecureAclsProp, "true") + this.serverConfig.setProperty(KafkaConfig.AuthorizerClassNameProp, classOf[SimpleAclAuthorizer].getName) + // Some needed configuration for brokers, producers, and consumers + this.serverConfig.setProperty(KafkaConfig.OffsetsTopicPartitionsProp, "1") + this.serverConfig.setProperty(KafkaConfig.OffsetsTopicReplicationFactorProp, "1") + this.producerConfig.setProperty(ProducerConfig.ACKS_CONFIG, "1") + this.consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "group") + + /** + * Starts MiniKDC and only then sets up the parent trait. + */ + @Before + override def setUp { + securityProtocol match { + case SecurityProtocol.SSL => + startSasl(ZkSasl) + case _ => + startSasl(Both) + } + super.setUp + // create the test topic with all the brokers as replicas + TestUtils.createTopic(zkUtils, topic, 1, 1, this.servers) + } + + /** + * Closes MiniKDC last when tearing down. + */ + @After + override def tearDown { + super.tearDown + closeSasl() + } + + /** + * Tests the ability of producing and consuming with the appropriate ACLs set. + */ + @Test + def testProduceConsume { + AclCommand.main(produceAclArgs) + AclCommand.main(consumeAclArgs) + TestUtils.waitAndVerifyAcls(TopicReadAcl ++ TopicWriteAcl ++ TopicDescribeAcl, servers.head.apis.authorizer.get, topicResource) + TestUtils.waitAndVerifyAcls(GroupReadAcl, servers.head.apis.authorizer.get, groupResource) + //Produce records + debug("Starting to send records") + sendRecords(numRecords, tp) + //Consume records + debug("Finished sending and starting to consume records") + consumers.head.assign(List(tp).asJava) + consumeRecords(this.consumers.head) + debug("Finished consuming") + } + + /** + * Tests that a producer fails to publish messages when the appropriate ACL + * isn't set. + */ + @Test + def testNoProduceAcl { + //Produce records + debug("Starting to send records") + try{ + sendRecords(numRecords, tp) + fail("Topic authorization exception expected") + } catch { + case e: TopicAuthorizationException => //expected + } + } + + /** + * Tests that a consumer fails to consume messages without the appropriate + * ACL set. + */ + @Test + def testNoConsumeAcl { + AclCommand.main(produceAclArgs) + AclCommand.main(groupAclArgs) + TestUtils.waitAndVerifyAcls(TopicWriteAcl ++ TopicDescribeAcl, servers.head.apis.authorizer.get, topicResource) + TestUtils.waitAndVerifyAcls(GroupReadAcl, servers.head.apis.authorizer.get, groupResource) + //Produce records + debug("Starting to send records") + sendRecords(numRecords, tp) + //Consume records + debug("Finished sending and starting to consume records") + consumers.head.assign(List(tp).asJava) + try{ + consumeRecords(this.consumers.head) + fail("Topic authorization exception expected") + } catch { + case e: TopicAuthorizationException => //expected + } + } + + /** + * Tests that a consumer fails to consume messages without the appropriate + * ACL set. + */ + @Test + def testNoGroupAcl { + AclCommand.main(produceAclArgs) + TestUtils.waitAndVerifyAcls(TopicWriteAcl ++ TopicDescribeAcl, servers.head.apis.authorizer.get, topicResource) + //Produce records + debug("Starting to send records") + sendRecords(numRecords, tp) + //Consume records + debug("Finished sending and starting to consume records") + consumers.head.assign(List(tp).asJava) + try{ + consumeRecords(this.consumers.head) + fail("Topic authorization exception expected") + } catch { + case e: GroupAuthorizationException => //expected + } + } + + private def sendRecords(numRecords: Int, tp: TopicPartition) { + val futures = (0 until numRecords).map { i => + val record = new ProducerRecord(tp.topic(), tp.partition(), s"$i".getBytes, s"$i".getBytes) + debug(s"Sending this record: $record") + this.producers.head.send(record) + } + try { + futures.foreach(_.get) + } catch { + case e: ExecutionException => throw e.getCause + } + } + + private def consumeRecords(consumer: Consumer[Array[Byte], Array[Byte]], + numRecords: Int = 1, + startingOffset: Int = 0, + topic: String = topic, + part: Int = part) { + val records = new ArrayList[ConsumerRecord[Array[Byte], Array[Byte]]]() + val maxIters = numRecords * 50 + var iters = 0 + while (records.size < numRecords) { + for (record <- consumer.poll(50).asScala) { + records.add(record) + } + if (iters > maxIters) + throw new IllegalStateException("Failed to consume the expected records after " + iters + " iterations.") + iters += 1 + } + for (i <- 0 until numRecords) { + val record = records.get(i) + val offset = startingOffset + i + assertEquals(topic, record.topic()) + assertEquals(part, record.partition()) + assertEquals(offset.toLong, record.offset()) + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kafka/blob/4a8acdf9/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala index b7ecc9d..7aaa185 100644 --- a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala +++ b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala @@ -53,19 +53,26 @@ trait IntegrationTestHarness extends KafkaServerTestHarness { @Before override def setUp() { + val producerSecurityProps = TestUtils.producerSecurityConfigs(securityProtocol, trustStoreFile) + val consumerSecurityProps = TestUtils.consumerSecurityConfigs(securityProtocol, trustStoreFile) super.setUp() - producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList) producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[org.apache.kafka.common.serialization.ByteArraySerializer]) producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[org.apache.kafka.common.serialization.ByteArraySerializer]) - producerConfig.putAll(TestUtils.producerSecurityConfigs(securityProtocol, trustStoreFile)) - consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList) + producerConfig.putAll(producerSecurityProps) consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, classOf[org.apache.kafka.common.serialization.ByteArrayDeserializer]) consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, classOf[org.apache.kafka.common.serialization.ByteArrayDeserializer]) - consumerConfig.putAll(TestUtils.consumerSecurityConfigs(securityProtocol, trustStoreFile)) + consumerConfig.putAll(consumerSecurityProps) for (i <- 0 until producerCount) - producers += new KafkaProducer(producerConfig) + producers += TestUtils.createNewProducer(brokerList, + acks = 1, + securityProtocol = this.securityProtocol, + trustStoreFile = this.trustStoreFile, + props = Some(producerConfig)) for (i <- 0 until consumerCount) { - consumers += new KafkaConsumer(consumerConfig) + consumers += TestUtils.createNewConsumer(brokerList, + securityProtocol = this.securityProtocol, + trustStoreFile = this.trustStoreFile, + props = Some(consumerConfig)) } // create the consumer offset topic http://git-wip-us.apache.org/repos/asf/kafka/blob/4a8acdf9/core/src/test/scala/integration/kafka/api/SaslPlaintextConsumerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/SaslPlaintextConsumerTest.scala b/core/src/test/scala/integration/kafka/api/SaslPlaintextConsumerTest.scala index e6f0c2b..0d11dc1 100644 --- a/core/src/test/scala/integration/kafka/api/SaslPlaintextConsumerTest.scala +++ b/core/src/test/scala/integration/kafka/api/SaslPlaintextConsumerTest.scala @@ -15,5 +15,6 @@ package kafka.api import org.apache.kafka.common.protocol.SecurityProtocol class SaslPlaintextConsumerTest extends BaseConsumerTest with SaslTestHarness { + override protected val zkSaslEnabled = false override protected def securityProtocol = SecurityProtocol.SASL_PLAINTEXT } http://git-wip-us.apache.org/repos/asf/kafka/blob/4a8acdf9/core/src/test/scala/integration/kafka/api/SaslSetup.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/SaslSetup.scala b/core/src/test/scala/integration/kafka/api/SaslSetup.scala new file mode 100644 index 0000000..d49b5bd --- /dev/null +++ b/core/src/test/scala/integration/kafka/api/SaslSetup.scala @@ -0,0 +1,87 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.api + +import java.io.{File} +import javax.security.auth.login.Configuration + +import kafka.utils.{JaasTestUtils,TestUtils} +import org.apache.hadoop.minikdc.MiniKdc +import org.apache.kafka.common.security.JaasUtils +import org.apache.kafka.common.security.kerberos.LoginManager + +/* + * Implements an enumeration for the modes enabled here: + * zk only, kafka only, both. + */ +sealed trait SaslSetupMode +case object ZkSasl extends SaslSetupMode +case object KafkaSasl extends SaslSetupMode +case object Both extends SaslSetupMode + +/* + * Trait used in SaslTestHarness and EndToEndAuthorizationTest + * currently to setup a keytab and jaas files. + */ +trait SaslSetup { + private val workDir = new File(System.getProperty("test.dir", "target")) + private val kdcConf = MiniKdc.createConf() + private val kdc = new MiniKdc(kdcConf, workDir) + + def startSasl(mode: SaslSetupMode = Both) { + // Important if tests leak consumers, producers or brokers + LoginManager.closeAll() + val keytabFile = createKeytabAndSetConfiguration(mode) + kdc.start() + kdc.createPrincipal(keytabFile, "client", "kafka/localhost") + if (mode == Both || mode == ZkSasl) + System.setProperty("zookeeper.authProvider.1", "org.apache.zookeeper.server.auth.SASLAuthenticationProvider") + } + + protected def createKeytabAndSetConfiguration(mode: SaslSetupMode): File = { + val (keytabFile, jaasFile) = createKeytabAndJaasFiles(mode) + // This will cause a reload of the Configuration singleton when `getConfiguration` is called + Configuration.setConfiguration(null) + System.setProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM, jaasFile.getAbsolutePath) + keytabFile + } + + private def createKeytabAndJaasFiles(mode: SaslSetupMode): (File, File) = { + val keytabFile = TestUtils.tempFile() + val jaasFileName: String = mode match { + case ZkSasl => + JaasTestUtils.genZkFile + case KafkaSasl => + JaasTestUtils.genKafkaFile(keytabFile.getAbsolutePath) + case _ => + JaasTestUtils.genZkAndKafkaFile(keytabFile.getAbsolutePath) + } + val jaasFile = new File(jaasFileName) + + (keytabFile, jaasFile) + } + + def closeSasl() { + kdc.stop() + // Important if tests leak consumers, producers or brokers + LoginManager.closeAll() + System.clearProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM) + System.clearProperty("zookeeper.authProvider.1"); + Configuration.setConfiguration(null) + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kafka/blob/4a8acdf9/core/src/test/scala/integration/kafka/api/SaslSslConsumerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/SaslSslConsumerTest.scala b/core/src/test/scala/integration/kafka/api/SaslSslConsumerTest.scala index 4f8512a..388ddaf 100644 --- a/core/src/test/scala/integration/kafka/api/SaslSslConsumerTest.scala +++ b/core/src/test/scala/integration/kafka/api/SaslSslConsumerTest.scala @@ -15,8 +15,12 @@ package kafka.api import java.io.File import org.apache.kafka.common.protocol.SecurityProtocol +import kafka.server.KafkaConfig class SaslSslConsumerTest extends BaseConsumerTest with SaslTestHarness { + override protected val zkSaslEnabled = true + this.serverConfig.setProperty(KafkaConfig.ZkEnableSecureAclsProp, "true") override protected def securityProtocol = SecurityProtocol.SASL_SSL override protected lazy val trustStoreFile = Some(File.createTempFile("truststore", ".jks")) + } http://git-wip-us.apache.org/repos/asf/kafka/blob/4a8acdf9/core/src/test/scala/integration/kafka/api/SaslSslEndToEndAuthorizationTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/SaslSslEndToEndAuthorizationTest.scala b/core/src/test/scala/integration/kafka/api/SaslSslEndToEndAuthorizationTest.scala new file mode 100644 index 0000000..0ca0904 --- /dev/null +++ b/core/src/test/scala/integration/kafka/api/SaslSslEndToEndAuthorizationTest.scala @@ -0,0 +1,25 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.api + +import org.apache.kafka.common.protocol.SecurityProtocol + +class SaslSslEndToEndAuthorizationTest extends EndToEndAuthorizationTest { + override protected def securityProtocol = SecurityProtocol.SASL_SSL + override val clientPrincipal = "client" + override val kafkaPrincipal = "kafka" +} http://git-wip-us.apache.org/repos/asf/kafka/blob/4a8acdf9/core/src/test/scala/integration/kafka/api/SaslTestHarness.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/SaslTestHarness.scala b/core/src/test/scala/integration/kafka/api/SaslTestHarness.scala index 14ceb43..b4ae74f 100644 --- a/core/src/test/scala/integration/kafka/api/SaslTestHarness.scala +++ b/core/src/test/scala/integration/kafka/api/SaslTestHarness.scala @@ -12,70 +12,25 @@ */ package kafka.api -import java.io.{FileWriter, BufferedWriter, File} -import javax.security.auth.login.Configuration - -import kafka.utils.TestUtils import kafka.zk.ZooKeeperTestHarness -import org.apache.hadoop.minikdc.MiniKdc -import org.apache.kafka.common.security.JaasUtils -import org.apache.kafka.common.security.kerberos.LoginManager import org.junit.{After, Before} -trait SaslTestHarness extends ZooKeeperTestHarness { - - private val workDir = new File(System.getProperty("test.dir", "target")) - private val kdcConf = MiniKdc.createConf() - private val kdc = new MiniKdc(kdcConf, workDir) +trait SaslTestHarness extends ZooKeeperTestHarness with SaslSetup { + protected val zkSaslEnabled: Boolean @Before override def setUp() { - // Important if tests leak consumers, producers or brokers - LoginManager.closeAll() - val keytabFile = createKeytabAndSetConfiguration("kafka_jaas.conf") - kdc.start() - kdc.createPrincipal(keytabFile, "client", "kafka/localhost") + if (zkSaslEnabled) + startSasl(Both) + else + startSasl(KafkaSasl) super.setUp } - protected def createKeytabAndSetConfiguration(jaasResourceName: String): File = { - val (keytabFile, jaasFile) = createKeytabAndJaasFiles(jaasResourceName) - // This will cause a reload of the Configuration singleton when `getConfiguration` is called - Configuration.setConfiguration(null) - System.setProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM, jaasFile.getAbsolutePath) - keytabFile - } - - private def createKeytabAndJaasFiles(jaasResourceName: String): (File, File) = { - val keytabFile = TestUtils.tempFile() - val jaasFile = TestUtils.tempFile() - - val writer = new BufferedWriter(new FileWriter(jaasFile)) - val inputStream = Thread.currentThread().getContextClassLoader.getResourceAsStream(jaasResourceName) - if (inputStream == null) - throw new IllegalStateException(s"Could not find `$jaasResourceName`, make sure it is in the classpath") - val source = io.Source.fromInputStream(inputStream, "UTF-8") - - for (line <- source.getLines) { - val replaced = line.replaceAll("\\$keytab-location", keytabFile.getAbsolutePath) - writer.write(replaced) - writer.newLine() - } - - writer.close() - source.close() - - (keytabFile, jaasFile) - } - @After override def tearDown() { super.tearDown - kdc.stop() - // Important if tests leak consumers, producers or brokers - LoginManager.closeAll() - System.clearProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM) - Configuration.setConfiguration(null) + closeSasl() } } http://git-wip-us.apache.org/repos/asf/kafka/blob/4a8acdf9/core/src/test/scala/integration/kafka/api/SslEndToEndAuthorizationTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/SslEndToEndAuthorizationTest.scala b/core/src/test/scala/integration/kafka/api/SslEndToEndAuthorizationTest.scala new file mode 100644 index 0000000..15e8527 --- /dev/null +++ b/core/src/test/scala/integration/kafka/api/SslEndToEndAuthorizationTest.scala @@ -0,0 +1,29 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.api + +import org.apache.kafka.common.config.SslConfigs +import org.apache.kafka.common.protocol.SecurityProtocol + + +class SslEndToEndAuthorizationTest extends EndToEndAuthorizationTest { + override protected def securityProtocol = SecurityProtocol.SSL + this.serverConfig.setProperty(SslConfigs.SSL_CLIENT_AUTH_CONFIG, "required") + override val clientPrincipal = "O=client,CN=localhost" + override val kafkaPrincipal = "O=server,CN=localhost" +} http://git-wip-us.apache.org/repos/asf/kafka/blob/4a8acdf9/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala index e9784e0..870b9ad 100755 --- a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala +++ b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala @@ -25,6 +25,7 @@ import kafka.server._ import kafka.utils.{CoreUtils, TestUtils} import kafka.zk.ZooKeeperTestHarness import org.apache.kafka.common.protocol.SecurityProtocol +import org.apache.kafka.common.security.auth.KafkaPrincipal import org.junit.{After, Before} import scala.collection.mutable.Buffer @@ -37,6 +38,8 @@ trait KafkaServerTestHarness extends ZooKeeperTestHarness { var servers: Buffer[KafkaServer] = null var brokerList: String = null var alive: Array[Boolean] = null + val kafkaPrincipalType = KafkaPrincipal.USER_TYPE + val setClusterAcl: Option[() => Unit] = None /** * Implementations must override this method to return a set of KafkaConfigs. This method will be invoked for every @@ -58,12 +61,27 @@ trait KafkaServerTestHarness extends ZooKeeperTestHarness { @Before override def setUp() { super.setUp - if(configs.size <= 0) + if (configs.size <= 0) throw new KafkaException("Must supply at least one server config.") servers = configs.map(TestUtils.createServer(_)).toBuffer brokerList = TestUtils.getBrokerListStrFromServers(servers, securityProtocol) alive = new Array[Boolean](servers.length) Arrays.fill(alive, true) + // We need to set a cluster ACL in some cases here + // because of the topic creation in the setup of + // IntegrationTestHarness. If we don't, then tests + // fail with a cluster action authorization exception + // when processing an update metadata request + // (controller -> broker). + // + // The following method does nothing by default, but + // if the test case requires setting up a cluster ACL, + // then it needs to be implemented. + setClusterAcl match { + case Some(f) => + f() + case None => // Nothing to do + } } @After http://git-wip-us.apache.org/repos/asf/kafka/blob/4a8acdf9/core/src/test/scala/unit/kafka/integration/SaslPlaintextTopicMetadataTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/integration/SaslPlaintextTopicMetadataTest.scala b/core/src/test/scala/unit/kafka/integration/SaslPlaintextTopicMetadataTest.scala index 11d6da4..888207c 100644 --- a/core/src/test/scala/unit/kafka/integration/SaslPlaintextTopicMetadataTest.scala +++ b/core/src/test/scala/unit/kafka/integration/SaslPlaintextTopicMetadataTest.scala @@ -21,6 +21,7 @@ import kafka.api.SaslTestHarness import org.apache.kafka.common.protocol.SecurityProtocol class SaslPlaintextTopicMetadataTest extends BaseTopicMetadataTest with SaslTestHarness { + override protected val zkSaslEnabled = false protected def securityProtocol = SecurityProtocol.SASL_PLAINTEXT protected def trustStoreFile = None } http://git-wip-us.apache.org/repos/asf/kafka/blob/4a8acdf9/core/src/test/scala/unit/kafka/integration/SaslSslTopicMetadataTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/integration/SaslSslTopicMetadataTest.scala b/core/src/test/scala/unit/kafka/integration/SaslSslTopicMetadataTest.scala index ea15419..f313280 100644 --- a/core/src/test/scala/unit/kafka/integration/SaslSslTopicMetadataTest.scala +++ b/core/src/test/scala/unit/kafka/integration/SaslSslTopicMetadataTest.scala @@ -23,6 +23,7 @@ import kafka.api.SaslTestHarness import org.apache.kafka.common.protocol.SecurityProtocol class SaslSslTopicMetadataTest extends BaseTopicMetadataTest with SaslTestHarness { + override protected val zkSaslEnabled = false protected def securityProtocol = SecurityProtocol.SASL_SSL protected lazy val trustStoreFile = Some(File.createTempFile("truststore", ".jks")) } http://git-wip-us.apache.org/repos/asf/kafka/blob/4a8acdf9/core/src/test/scala/unit/kafka/server/SaslPlaintextReplicaFetchTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/SaslPlaintextReplicaFetchTest.scala b/core/src/test/scala/unit/kafka/server/SaslPlaintextReplicaFetchTest.scala index 740db37..435a0f0 100644 --- a/core/src/test/scala/unit/kafka/server/SaslPlaintextReplicaFetchTest.scala +++ b/core/src/test/scala/unit/kafka/server/SaslPlaintextReplicaFetchTest.scala @@ -21,6 +21,7 @@ import kafka.api.SaslTestHarness import org.apache.kafka.common.protocol.SecurityProtocol class SaslPlaintextReplicaFetchTest extends BaseReplicaFetchTest with SaslTestHarness { + override protected val zkSaslEnabled = false protected def securityProtocol = SecurityProtocol.SASL_PLAINTEXT protected def trustStoreFile = None } http://git-wip-us.apache.org/repos/asf/kafka/blob/4a8acdf9/core/src/test/scala/unit/kafka/server/SaslSslReplicaFetchTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/SaslSslReplicaFetchTest.scala b/core/src/test/scala/unit/kafka/server/SaslSslReplicaFetchTest.scala index 1bcf8ac..3af8485 100644 --- a/core/src/test/scala/unit/kafka/server/SaslSslReplicaFetchTest.scala +++ b/core/src/test/scala/unit/kafka/server/SaslSslReplicaFetchTest.scala @@ -23,6 +23,7 @@ import kafka.api.SaslTestHarness import org.apache.kafka.common.protocol.SecurityProtocol class SaslSslReplicaFetchTest extends BaseReplicaFetchTest with SaslTestHarness { + override protected val zkSaslEnabled = false protected def securityProtocol = SecurityProtocol.SASL_SSL protected lazy val trustStoreFile = Some(File.createTempFile("truststore", ".jks")) } http://git-wip-us.apache.org/repos/asf/kafka/blob/4a8acdf9/core/src/test/scala/unit/kafka/utils/JaasTestUtils.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/utils/JaasTestUtils.scala b/core/src/test/scala/unit/kafka/utils/JaasTestUtils.scala index 78a74d5..cf08830 100644 --- a/core/src/test/scala/unit/kafka/utils/JaasTestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/JaasTestUtils.scala @@ -18,23 +18,70 @@ package kafka.utils object JaasTestUtils { - val serverContextName = "Server" + // ZooKeeper vals + val zkServerContextName = "Server" + val zkClientContextName = "Client" val userSuperPasswd = "adminpasswd" val user = "fpj" val userPasswd = "fpjsecret" - val module = "org.apache.zookeeper.server.auth.DigestLoginModule" - + val zkModule = "org.apache.zookeeper.server.auth.DigestLoginModule" + //Kafka vals + val kafkaServerContextName = "KafkaServer" + val kafkaClientContextName = "KafkaClient" + val kafkaServerPrincipal = "cli...@example.com" + val kafkaClientPrincipal = "kafka/localh...@example.com" + val kafkaModule = "com.sun.security.auth.module.Krb5LoginModule" + def genZkFile: String = { - val jaasFile = java.io.File.createTempFile("jaas", "conf") + val jaasFile = java.io.File.createTempFile("jaas", ".conf") + val jaasOutputStream = new java.io.FileOutputStream(jaasFile) + writeZkToOutputStream(jaasOutputStream) + jaasOutputStream.close() + jaasFile.deleteOnExit() + jaasFile.getCanonicalPath + } + + def genKafkaFile(keytabLocation: String): String = { + val jaasFile = java.io.File.createTempFile("jaas", ".conf") val jaasOutputStream = new java.io.FileOutputStream(jaasFile) - jaasOutputStream.write(s"Server {\n\t$module required\n".getBytes) + writeKafkaToOutputStream(jaasOutputStream, keytabLocation) + jaasOutputStream.close() + jaasFile.deleteOnExit() + jaasFile.getCanonicalPath + } + + def genZkAndKafkaFile(keytabLocation: String): String = { + val jaasFile = java.io.File.createTempFile("jaas", ".conf") + val jaasOutputStream = new java.io.FileOutputStream(jaasFile) + writeKafkaToOutputStream(jaasOutputStream, keytabLocation) + jaasOutputStream.write("\n\n".getBytes) + writeZkToOutputStream(jaasOutputStream) + jaasOutputStream.close() + jaasFile.deleteOnExit() + jaasFile.getCanonicalPath + } + + private def writeZkToOutputStream(jaasOutputStream: java.io.FileOutputStream) { + jaasOutputStream.write(s"$zkServerContextName {\n\t$zkModule required\n".getBytes) jaasOutputStream.write(s"""\tuser_super="$userSuperPasswd"\n""".getBytes) jaasOutputStream.write(s"""\tuser_$user="$userPasswd";\n};\n\n""".getBytes) - jaasOutputStream.write(s"""Client {\n\t$module required\n""".getBytes) + jaasOutputStream.write(s"""$zkClientContextName {\n\t$zkModule required\n""".getBytes) jaasOutputStream.write(s"""\tusername="$user"\n""".getBytes) jaasOutputStream.write(s"""\tpassword="$userPasswd";\n};""".getBytes) - jaasOutputStream.close() - jaasFile.deleteOnExit() - jaasFile.getCanonicalPath + } + + private def writeKafkaToOutputStream(jaasOutputStream: java.io.FileOutputStream, keytabLocation: String) { + jaasOutputStream.write(s"$kafkaClientContextName {\n\t$kafkaModule required debug=true\n".getBytes) + jaasOutputStream.write(s"\tuseKeyTab=true\n".getBytes) + jaasOutputStream.write(s"\tstoreKey=true\n".getBytes) + jaasOutputStream.write(s"""\tserviceName="kafka"\n""".getBytes) + jaasOutputStream.write(s"""\tkeyTab="$keytabLocation"\n""".getBytes) + jaasOutputStream.write(s"""\tprincipal="$kafkaServerPrincipal";\n};\n\n""".getBytes) + jaasOutputStream.write(s"""$kafkaServerContextName {\n\t$kafkaModule required debug=true\n""".getBytes) + jaasOutputStream.write(s"\tuseKeyTab=true\n".getBytes) + jaasOutputStream.write(s"\tstoreKey=true\n".getBytes) + jaasOutputStream.write(s"""\tserviceName="kafka"\n""".getBytes) + jaasOutputStream.write(s"""\tkeyTab="$keytabLocation"\n""".getBytes) + jaasOutputStream.write(s"""\tprincipal="$kafkaClientPrincipal";\n};""".getBytes) } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kafka/blob/4a8acdf9/core/src/test/scala/unit/kafka/utils/TestUtils.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index 88c91f4..7c928c5 100755 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -29,13 +29,11 @@ import charset.Charset import kafka.security.auth.{Resource, Authorizer, Acl} import org.apache.kafka.common.protocol.SecurityProtocol -import org.apache.kafka.common.security.ssl.SslFactory import org.apache.kafka.common.utils.Utils._ import org.apache.kafka.test.TestSslUtils import scala.collection.mutable.{ArrayBuffer, ListBuffer} -import org.I0Itec.zkclient.{ZkClient, ZkConnection} import kafka.server._ import kafka.producer._ import kafka.message._ @@ -217,7 +215,7 @@ object TestUtils extends Logging { props.put("controlled.shutdown.retry.backoff.ms", "100") if (protocolAndPorts.exists { case (protocol, _) => usesSslTransportLayer(protocol) }) - props.putAll(sslConfigs(Mode.SERVER, true, trustStoreFile, s"server$nodeId")) + props.putAll(sslConfigs(Mode.SERVER, false, trustStoreFile, s"server$nodeId")) interBrokerSecurityProtocol.foreach { protocol => props.put(KafkaConfig.InterBrokerSecurityProtocolProp, protocol.name) @@ -444,7 +442,7 @@ object TestUtils extends Logging { certAlias: String): Properties = { val props = new Properties if (usesSslTransportLayer(securityProtocol)) - props.putAll(sslConfigs(mode, false, trustStoreFile, certAlias)) + props.putAll(sslConfigs(mode, true, trustStoreFile, certAlias)) props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, securityProtocol.name) props } @@ -473,7 +471,7 @@ object TestUtils extends Logging { producerProps.put(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG, metadataFetchTimeout.toString) producerProps.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferSize.toString) producerProps.put(ProducerConfig.RETRIES_CONFIG, retries.toString) - + /* Only use these if not already set */ val defaultProps = Map( ProducerConfig.RETRY_BACKOFF_MS_CONFIG -> "100", @@ -486,7 +484,14 @@ object TestUtils extends Logging { if (!producerProps.containsKey(key)) producerProps.put(key, value) } - producerProps.putAll(producerSecurityConfigs(securityProtocol, trustStoreFile)) + /* + * It uses CommonClientConfigs.SECURITY_PROTOCOL_CONFIG to determine whether + * securityConfigs has been invoked already. For example, we need to + * invoke it before this call in IntegrationTestHarness, otherwise the + * SSL client auth fails. + */ + if(!producerProps.containsKey(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG)) + producerProps.putAll(producerSecurityConfigs(securityProtocol, trustStoreFile)) new KafkaProducer[Array[Byte],Array[Byte]](producerProps) } @@ -503,29 +508,42 @@ object TestUtils extends Logging { * Create a new consumer with a few pre-configured properties. */ def createNewConsumer(brokerList: String, - groupId: String, + groupId: String = "group", autoOffsetReset: String = "earliest", partitionFetchSize: Long = 4096L, partitionAssignmentStrategy: String = classOf[RangeAssignor].getName, sessionTimeout: Int = 30000, securityProtocol: SecurityProtocol, - trustStoreFile: Option[File] = None) : KafkaConsumer[Array[Byte],Array[Byte]] = { + trustStoreFile: Option[File] = None, + props: Option[Properties] = None) : KafkaConsumer[Array[Byte],Array[Byte]] = { import org.apache.kafka.clients.consumer.ConsumerConfig - val consumerProps= new Properties() + val consumerProps = props.getOrElse(new Properties()) consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList) - consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId) consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset) consumerProps.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, partitionFetchSize.toString) - consumerProps.put(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG, "100") - consumerProps.put(ConsumerConfig.RECONNECT_BACKOFF_MS_CONFIG, "200") - consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer") - consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer") - consumerProps.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, partitionAssignmentStrategy) - consumerProps.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout.toString) + val defaultProps = Map( + ConsumerConfig.RETRY_BACKOFF_MS_CONFIG -> "100", + ConsumerConfig.RECONNECT_BACKOFF_MS_CONFIG -> "200", + ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.ByteArrayDeserializer", + ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.ByteArrayDeserializer", + ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG -> partitionAssignmentStrategy, + ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG -> sessionTimeout.toString, + ConsumerConfig.GROUP_ID_CONFIG -> groupId) - consumerProps.putAll(consumerSecurityConfigs(securityProtocol, trustStoreFile)) + defaultProps.foreach { case (key, value) => + if (!consumerProps.containsKey(key)) consumerProps.put(key, value) + } + + /* + * It uses CommonClientConfigs.SECURITY_PROTOCOL_CONFIG to determine whether + * securityConfigs has been invoked already. For example, we need to + * invoke it before this call in IntegrationTestHarness, otherwise the + * SSL client auth fails. + */ + if(!consumerProps.containsKey(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG)) + consumerProps.putAll(consumerSecurityConfigs(securityProtocol, trustStoreFile)) new KafkaConsumer[Array[Byte],Array[Byte]](consumerProps) } @@ -990,12 +1008,7 @@ object TestUtils extends Logging { } - val sslConfigs = { - if (mode == Mode.SERVER) - TestSslUtils.createSslConfig(true, true, mode, trustStore, certAlias) - else - TestSslUtils.createSslConfig(clientCert, false, mode, trustStore, certAlias) - } + val sslConfigs = TestSslUtils.createSslConfig(clientCert, true, mode, trustStore, certAlias) val sslProps = new Properties() sslConfigs.foreach { case (k, v) => sslProps.put(k, v) }