Repository: kafka Updated Branches: refs/heads/trunk a420d20c0 -> 0bede30ad
http://git-wip-us.apache.org/repos/asf/kafka/blob/0bede30a/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala new file mode 100644 index 0000000..19c386f --- /dev/null +++ b/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala @@ -0,0 +1,83 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package kafka.server + +import java.io.File + +import org.apache.kafka.common.protocol.SecurityProtocol +import org.junit.{After, Before, Test} +import kafka.zk.ZooKeeperTestHarness +import kafka.utils.TestUtils +import TestUtils._ +import org.apache.kafka.clients.producer.ProducerRecord +import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.serialization.StringSerializer + +class ReplicaFetchTest extends ZooKeeperTestHarness { + var brokers: Seq[KafkaServer] = null + val topic1 = "foo" + val topic2 = "bar" + + @Before + override def setUp() { + super.setUp() + val props = createBrokerConfigs(2, zkConnect) + brokers = props.map(KafkaConfig.fromProps).map(TestUtils.createServer(_)) + } + + @After + override def tearDown() { + brokers.foreach(_.shutdown()) + super.tearDown() + } + + @Test + def testReplicaFetcherThread() { + val partition = 0 + val testMessageList1 = List("test1", "test2", "test3", "test4") + val testMessageList2 = List("test5", "test6", "test7", "test8") + + // create a topic and partition and await leadership + for (topic <- List(topic1,topic2)) { + createTopic(zkUtils, topic, numPartitions = 1, replicationFactor = 2, servers = brokers) + } + + // send test messages to leader + val producer = TestUtils.createNewProducer(TestUtils.getBrokerListStrFromServers(brokers), + retries = 5, + keySerializer = new StringSerializer, + valueSerializer = new StringSerializer) + val records = testMessageList1.map(m => new ProducerRecord(topic1, m, m)) ++ + testMessageList2.map(m => new ProducerRecord(topic2, m, m)) + records.map(producer.send).foreach(_.get) + producer.close() + + def logsMatch(): Boolean = { + var result = true + for (topic <- List(topic1, topic2)) { + val tp = new TopicPartition(topic, partition) + val expectedOffset = brokers.head.getLogManager().getLog(tp).get.logEndOffset + result = result && expectedOffset > 0 && brokers.forall { item => + expectedOffset == item.getLogManager().getLog(tp).get.logEndOffset + } + } + result + } + waitUntilTrue(logsMatch _, "Broker logs should be identical") + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/0bede30a/core/src/test/scala/unit/kafka/server/SaslApiVersionsRequestTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/SaslApiVersionsRequestTest.scala b/core/src/test/scala/unit/kafka/server/SaslApiVersionsRequestTest.scala index 92a518d..1ee4ac8 100644 --- a/core/src/test/scala/unit/kafka/server/SaslApiVersionsRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/SaslApiVersionsRequestTest.scala @@ -24,19 +24,31 @@ import org.apache.kafka.common.protocol.{ApiKeys, Errors, SecurityProtocol} import org.apache.kafka.common.requests.{ApiVersionsRequest, ApiVersionsResponse} import org.apache.kafka.common.requests.SaslHandshakeRequest import org.apache.kafka.common.requests.SaslHandshakeResponse -import org.junit.Test +import org.junit.{After, Before, Test} import org.junit.Assert._ -import kafka.api.SaslTestHarness +import kafka.api.{KafkaSasl, SaslSetup} +import kafka.utils.JaasTestUtils -class SaslApiVersionsRequestTest extends BaseRequestTest with SaslTestHarness { +class SaslApiVersionsRequestTest extends BaseRequestTest with SaslSetup { override protected def securityProtocol = SecurityProtocol.SASL_PLAINTEXT - override protected val kafkaClientSaslMechanism = "PLAIN" - override protected val kafkaServerSaslMechanisms = List("PLAIN") - override protected val serverSaslProperties = Some(kafkaServerSaslProperties(kafkaServerSaslMechanisms, kafkaClientSaslMechanism)) - override protected val clientSaslProperties = Some(kafkaClientSaslProperties(kafkaClientSaslMechanism)) - override protected val zkSaslEnabled = false + private val kafkaClientSaslMechanism = "PLAIN" + private val kafkaServerSaslMechanisms = List("PLAIN") + protected override val serverSaslProperties = Some(kafkaServerSaslProperties(kafkaServerSaslMechanisms, kafkaClientSaslMechanism)) + protected override val clientSaslProperties = Some(kafkaClientSaslProperties(kafkaClientSaslMechanism)) override def numBrokers = 1 + @Before + override def setUp(): Unit = { + startSasl(jaasSections(kafkaServerSaslMechanisms, Some(kafkaClientSaslMechanism), KafkaSasl, JaasTestUtils.KafkaServerContextName)) + super.setUp() + } + + @After + override def tearDown(): Unit = { + super.tearDown() + closeSasl() + } + @Test def testApiVersionsRequestBeforeSaslHandshakeRequest() { val plaintextSocket = connect(protocol = securityProtocol) http://git-wip-us.apache.org/repos/asf/kafka/blob/0bede30a/core/src/test/scala/unit/kafka/server/SaslPlaintextReplicaFetchTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/SaslPlaintextReplicaFetchTest.scala b/core/src/test/scala/unit/kafka/server/SaslPlaintextReplicaFetchTest.scala deleted file mode 100644 index 435a0f0..0000000 --- a/core/src/test/scala/unit/kafka/server/SaslPlaintextReplicaFetchTest.scala +++ /dev/null @@ -1,27 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.server - -import kafka.api.SaslTestHarness -import org.apache.kafka.common.protocol.SecurityProtocol - -class SaslPlaintextReplicaFetchTest extends BaseReplicaFetchTest with SaslTestHarness { - override protected val zkSaslEnabled = false - protected def securityProtocol = SecurityProtocol.SASL_PLAINTEXT - protected def trustStoreFile = None -} http://git-wip-us.apache.org/repos/asf/kafka/blob/0bede30a/core/src/test/scala/unit/kafka/server/SaslSslReplicaFetchTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/SaslSslReplicaFetchTest.scala b/core/src/test/scala/unit/kafka/server/SaslSslReplicaFetchTest.scala deleted file mode 100644 index 3af8485..0000000 --- a/core/src/test/scala/unit/kafka/server/SaslSslReplicaFetchTest.scala +++ /dev/null @@ -1,29 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.server - -import java.io.File - -import kafka.api.SaslTestHarness -import org.apache.kafka.common.protocol.SecurityProtocol - -class SaslSslReplicaFetchTest extends BaseReplicaFetchTest with SaslTestHarness { - override protected val zkSaslEnabled = false - protected def securityProtocol = SecurityProtocol.SASL_SSL - protected lazy val trustStoreFile = Some(File.createTempFile("truststore", ".jks")) -} http://git-wip-us.apache.org/repos/asf/kafka/blob/0bede30a/core/src/test/scala/unit/kafka/server/SslReplicaFetchTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/SslReplicaFetchTest.scala b/core/src/test/scala/unit/kafka/server/SslReplicaFetchTest.scala deleted file mode 100644 index dad2285..0000000 --- a/core/src/test/scala/unit/kafka/server/SslReplicaFetchTest.scala +++ /dev/null @@ -1,27 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.server - -import java.io.File - -import org.apache.kafka.common.protocol.SecurityProtocol - -class SslReplicaFetchTest extends BaseReplicaFetchTest { - protected def securityProtocol = SecurityProtocol.SSL - protected lazy val trustStoreFile = Some(File.createTempFile("truststore", ".jks")) -} http://git-wip-us.apache.org/repos/asf/kafka/blob/0bede30a/core/src/test/scala/unit/kafka/utils/JaasTestUtils.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/utils/JaasTestUtils.scala b/core/src/test/scala/unit/kafka/utils/JaasTestUtils.scala index 3ae680c..d10e861 100644 --- a/core/src/test/scala/unit/kafka/utils/JaasTestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/JaasTestUtils.scala @@ -25,68 +25,67 @@ object JaasTestUtils { keyTab: String, principal: String, debug: Boolean, - serviceName: Option[String]) { - def toJaasModule: JaasModule = { - JaasModule( - "com.sun.security.auth.module.Krb5LoginModule", - debug = debug, - entries = Map( - "useKeyTab" -> useKeyTab.toString, - "storeKey" -> storeKey.toString, - "keyTab" -> keyTab, - "principal" -> principal - ) ++ serviceName.map(s => Map("serviceName" -> s)).getOrElse(Map.empty) - ) - } + serviceName: Option[String]) extends JaasModule { + + def name = "com.sun.security.auth.module.Krb5LoginModule" + + def entries: Map[String, String] = Map( + "useKeyTab" -> useKeyTab.toString, + "storeKey" -> storeKey.toString, + "keyTab" -> keyTab, + "principal" -> principal + ) ++ serviceName.map(s => Map("serviceName" -> s)).getOrElse(Map.empty) + } case class PlainLoginModule(username: String, password: String, debug: Boolean = false, - validUsers: Map[String, String] = Map.empty) { - def toJaasModule: JaasModule = { - JaasModule( - "org.apache.kafka.common.security.plain.PlainLoginModule", - debug = debug, - entries = Map( - "username" -> username, - "password" -> password - ) ++ validUsers.map { case (user, pass) => s"user_$user" -> pass } - ) - } + validUsers: Map[String, String] = Map.empty) extends JaasModule { + + def name = "org.apache.kafka.common.security.plain.PlainLoginModule" + + def entries: Map[String, String] = Map( + "username" -> username, + "password" -> password + ) ++ validUsers.map { case (user, pass) => s"user_$user" -> pass } + + } + + case class ZkDigestModule(debug: Boolean = false, + entries: Map[String, String] = Map.empty) extends JaasModule { + def name = "org.apache.zookeeper.server.auth.DigestLoginModule" } case class ScramLoginModule(username: String, password: String, - debug: Boolean = false) { - def toJaasModule: JaasModule = { - JaasModule( - "org.apache.kafka.common.security.scram.ScramLoginModule", - debug = debug, - entries = Map( - "username" -> username, - "password" -> password - ) - ) - } + debug: Boolean = false) extends JaasModule { + + def name = "org.apache.kafka.common.security.scram.ScramLoginModule" + + def entries: Map[String, String] = Map( + "username" -> username, + "password" -> password + ) } - case class JaasModule(moduleName: String, - debug: Boolean, - entries: Map[String, String]) { + sealed trait JaasModule { + def name: String + def debug: Boolean + def entries: Map[String, String] + override def toString: String = { - s"""$moduleName required + s"""$name required | debug=$debug | ${entries.map { case (k, v) => s"""$k="$v"""" }.mkString("", "\n| ", ";")} |""".stripMargin } } - case class JaasSection(contextName: String, - jaasModule: Seq[JaasModule]) { + case class JaasSection(contextName: String, modules: Seq[JaasModule]) { override def toString: String = { s"""|$contextName { - | ${jaasModule.mkString("\n ")} + | ${modules.mkString("\n ")} |}; |""".stripMargin } @@ -97,7 +96,6 @@ object JaasTestUtils { private val ZkUserSuperPasswd = "adminpasswd" private val ZkUser = "fpj" private val ZkUserPassword = "fpjsecret" - private val ZkModule = "org.apache.zookeeper.server.auth.DigestLoginModule" val KafkaServerContextName = "KafkaServer" val KafkaServerPrincipalUnqualifiedName = "kafka" @@ -122,10 +120,10 @@ object JaasTestUtils { val KafkaScramAdmin = "scram-admin" val KafkaScramAdminPassword = "scram-admin-secret" - def writeJaasContextsToFile(jaasContexts: Seq[JaasSection]): String = { + def writeJaasContextsToFile(jaasSections: Seq[JaasSection]): File = { val jaasFile = TestUtils.tempFile() - writeToFile(jaasFile,jaasContexts) - jaasFile.getCanonicalPath + writeToFile(jaasFile, jaasSections) + jaasFile } // Returns the dynamic configuration, using credentials for user #1 @@ -133,11 +131,13 @@ object JaasTestUtils { kafkaClientModule(mechanism, keytabLocation, KafkaClientPrincipal, KafkaPlainUser, KafkaPlainPassword, KafkaScramUser, KafkaScramPassword).toString def zkSections: Seq[JaasSection] = Seq( - new JaasSection(ZkServerContextName, Seq(JaasModule(ZkModule, false, Map("user_super" -> ZkUserSuperPasswd, s"user_$ZkUser" -> ZkUserPassword)))), - new JaasSection(ZkClientContextName, Seq(JaasModule(ZkModule, false, Map("username" -> ZkUser, "password" -> ZkUserPassword)))) + new JaasSection(ZkServerContextName, Seq(ZkDigestModule(debug = false, + Map("user_super" -> ZkUserSuperPasswd, s"user_$ZkUser" -> ZkUserPassword)))), + new JaasSection(ZkClientContextName, Seq(ZkDigestModule(debug = false, + Map("username" -> ZkUser, "password" -> ZkUserPassword)))) ) - def kafkaServerSection(contextName: String, mechanisms: List[String], keytabLocation: Option[File]): JaasSection = { + def kafkaServerSection(contextName: String, mechanisms: Seq[String], keytabLocation: Option[File]): JaasSection = { val modules = mechanisms.map { case "GSSAPI" => Krb5LoginModule( @@ -146,18 +146,22 @@ object JaasTestUtils { keyTab = keytabLocation.getOrElse(throw new IllegalArgumentException("Keytab location not specified for GSSAPI")).getAbsolutePath, principal = KafkaServerPrincipal, debug = true, - serviceName = Some("kafka")).toJaasModule + serviceName = Some("kafka")) case "PLAIN" => PlainLoginModule( KafkaPlainAdmin, KafkaPlainAdminPassword, debug = false, - Map(KafkaPlainAdmin -> KafkaPlainAdminPassword, KafkaPlainUser -> KafkaPlainPassword, KafkaPlainUser2 -> KafkaPlainPassword2)).toJaasModule + Map( + KafkaPlainAdmin -> KafkaPlainAdminPassword, + KafkaPlainUser -> KafkaPlainPassword, + KafkaPlainUser2 -> KafkaPlainPassword2 + )) case "SCRAM-SHA-256" | "SCRAM-SHA-512" => ScramLoginModule( KafkaScramAdmin, KafkaScramAdminPassword, - debug = false).toJaasModule + debug = false) case mechanism => throw new IllegalArgumentException("Unsupported server mechanism " + mechanism) } new JaasSection(contextName, modules) @@ -177,17 +181,17 @@ object JaasTestUtils { principal = clientPrincipal, debug = true, serviceName = Some("kafka") - ).toJaasModule + ) case "PLAIN" => PlainLoginModule( plainUser, plainPassword - ).toJaasModule + ) case "SCRAM-SHA-256" | "SCRAM-SHA-512" => ScramLoginModule( scramUser, scramPassword - ).toJaasModule + ) case mechanism => throw new IllegalArgumentException("Unsupported client mechanism " + mechanism) } } http://git-wip-us.apache.org/repos/asf/kafka/blob/0bede30a/core/src/test/scala/unit/kafka/utils/TestUtils.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index 05d9686..01ff83d 100755 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -227,7 +227,7 @@ object TestUtils extends Logging { if (protocolAndPorts.exists { case (protocol, _) => usesSslTransportLayer(protocol) }) props.putAll(sslConfigs(Mode.SERVER, false, trustStoreFile, s"server$nodeId")) - if (protocolAndPorts.exists { case (protocol, _) => usesSaslTransportLayer(protocol) }) + if (protocolAndPorts.exists { case (protocol, _) => usesSaslAuthentication(protocol) }) props.putAll(saslConfigs(saslProperties)) interBrokerSecurityProtocol.foreach { protocol => @@ -492,7 +492,7 @@ object TestUtils extends Logging { val props = new Properties if (usesSslTransportLayer(securityProtocol)) props.putAll(sslConfigs(mode, securityProtocol == SecurityProtocol.SSL, trustStoreFile, certAlias)) - if (usesSaslTransportLayer(securityProtocol)) + if (usesSaslAuthentication(securityProtocol)) props.putAll(saslConfigs(saslProperties)) props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, securityProtocol.name) props @@ -549,12 +549,12 @@ object TestUtils extends Logging { new KafkaProducer[K, V](producerProps, keySerializer, valueSerializer) } - private def usesSslTransportLayer(securityProtocol: SecurityProtocol): Boolean = securityProtocol match { + def usesSslTransportLayer(securityProtocol: SecurityProtocol): Boolean = securityProtocol match { case SecurityProtocol.SSL | SecurityProtocol.SASL_SSL => true case _ => false } - private def usesSaslTransportLayer(securityProtocol: SecurityProtocol): Boolean = securityProtocol match { + def usesSaslAuthentication(securityProtocol: SecurityProtocol): Boolean = securityProtocol match { case SecurityProtocol.SASL_PLAINTEXT | SecurityProtocol.SASL_SSL => true case _ => false } http://git-wip-us.apache.org/repos/asf/kafka/blob/0bede30a/core/src/test/scala/unit/kafka/zk/ZKEphemeralTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/zk/ZKEphemeralTest.scala b/core/src/test/scala/unit/kafka/zk/ZKEphemeralTest.scala index c9076b5..75625cd 100644 --- a/core/src/test/scala/unit/kafka/zk/ZKEphemeralTest.scala +++ b/core/src/test/scala/unit/kafka/zk/ZKEphemeralTest.scala @@ -53,13 +53,12 @@ class ZKEphemeralTest(val secure: Boolean) extends ZooKeeperTestHarness { @Before override def setUp() { - if(secure) { + if (secure) { Configuration.setConfiguration(null) - System.setProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM, jaasFile) + System.setProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM, jaasFile.getAbsolutePath) System.setProperty(authProvider, "org.apache.zookeeper.server.auth.SASLAuthenticationProvider") - if(!JaasUtils.isZkSecurityEnabled()) { + if (!JaasUtils.isZkSecurityEnabled) fail("Secure access not enabled") - } } super.setUp }
