Repository: kafka
Updated Branches:
  refs/heads/trunk 1dd558aec -> 4a8acdf9d


KAFKA-2732; Add class for ZK Auth.

Author: Flavio Junqueira <f...@apache.org>

Reviewers: Ismael Juma <ism...@juma.me.uk>, Ben Stopford 
<benstopf...@gmail.com>, Jun Rao <jun...@gmail.com>

Closes #410 from fpj/KAFKA-2732


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/4a8acdf9
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/4a8acdf9
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/4a8acdf9

Branch: refs/heads/trunk
Commit: 4a8acdf9d7135a50b751a0bea9e205ea4a071e5c
Parents: 1dd558a
Author: Flavio Junqueira <f...@apache.org>
Authored: Sat Nov 28 08:23:35 2015 -0800
Committer: Jun Rao <jun...@gmail.com>
Committed: Sat Nov 28 08:23:35 2015 -0800

----------------------------------------------------------------------
 .../kafka/api/EndToEndAuthorizationTest.scala   | 273 +++++++++++++++++++
 .../kafka/api/IntegrationTestHarness.scala      |  19 +-
 .../kafka/api/SaslPlaintextConsumerTest.scala   |   1 +
 .../scala/integration/kafka/api/SaslSetup.scala |  87 ++++++
 .../kafka/api/SaslSslConsumerTest.scala         |   4 +
 .../api/SaslSslEndToEndAuthorizationTest.scala  |  25 ++
 .../integration/kafka/api/SaslTestHarness.scala |  59 +---
 .../api/SslEndToEndAuthorizationTest.scala      |  29 ++
 .../integration/KafkaServerTestHarness.scala    |  20 +-
 .../SaslPlaintextTopicMetadataTest.scala        |   1 +
 .../integration/SaslSslTopicMetadataTest.scala  |   1 +
 .../server/SaslPlaintextReplicaFetchTest.scala  |   1 +
 .../kafka/server/SaslSslReplicaFetchTest.scala  |   1 +
 .../scala/unit/kafka/utils/JaasTestUtils.scala  |  65 ++++-
 .../test/scala/unit/kafka/utils/TestUtils.scala |  59 ++--
 15 files changed, 554 insertions(+), 91 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/4a8acdf9/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala 
b/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala
new file mode 100644
index 0000000..59cff14
--- /dev/null
+++ b/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala
@@ -0,0 +1,273 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * 
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.api
+
+import java.io.File
+import java.util.ArrayList
+import java.util.concurrent.ExecutionException
+
+import kafka.admin.AclCommand
+import kafka.common.TopicAndPartition
+import kafka.security.auth._
+import kafka.server._
+import kafka.utils._
+
+import org.apache.kafka.clients.consumer.{Consumer, ConsumerRecord, 
ConsumerConfig}
+import org.apache.kafka.clients.producer.{ProducerRecord, ProducerConfig}
+import org.apache.kafka.common.security.auth.KafkaPrincipal
+import org.apache.kafka.common.{TopicPartition}
+import org.apache.kafka.common.protocol.SecurityProtocol
+import 
org.apache.kafka.common.errors.{GroupAuthorizationException,TopicAuthorizationException}
+import org.junit.Assert._
+import org.junit.{Test, After, Before}
+
+import scala.collection.JavaConverters._
+
+
+/**
+  * The test cases here verify that a producer authorized to publish to a topic
+  * is able to, and that consumers in a group authorized to consume are able to
+  * to do so.
+  *
+  * This test relies on a chain of test harness traits to set up. It directly
+  * extends IntegrationTestHarness. IntegrationTestHarness creates producers 
and
+  * consumers, and it extends KafkaServerTestHarness. KafkaServerTestHarness 
starts
+  * brokers, but first it initializes a ZooKeeper server and client, which 
happens
+  * in ZooKeeperTestHarness.
+  *
+  * To start brokers when the security protocol is SASL_SSL, we need to set a 
cluster
+  * ACL, which happens optionally in KafkaServerTestHarness. If the security 
protocol
+  * is SSL or PLAINTEXT, then the ACL isn't set. The remaining ACLs to enable 
access
+  * to producers and consumers are set here. To set ACLs, we use AclCommand 
directly.
+  *
+  * Finally, we rely on SaslSetup to bootstrap and setup Kerberos. We don't use
+  * SaslTestHarness here directly because it extends ZooKeeperTestHarness, and 
we
+  * would end up with ZooKeeperTestHarness twice.
+  */
+trait EndToEndAuthorizationTest extends IntegrationTestHarness with SaslSetup {
+  override val producerCount = 1
+  override val consumerCount = 2
+  override val serverCount = 3
+  override val setClusterAcl = Some(() =>
+    { AclCommand.main(clusterAclArgs)
+      TestUtils.waitAndVerifyAcls(ClusterActionAcl, 
servers.head.apis.authorizer.get, clusterResource)
+    } : Unit
+  )
+  val numRecords = 1
+  val group = "group"
+  val topic = "e2etopic"
+  val part = 0
+  val tp = new TopicPartition(topic, part)
+  val topicAndPartition = new TopicAndPartition(topic, part)
+  val clientPrincipal: String
+  val kafkaPrincipal: String
+
+  override protected lazy val trustStoreFile = 
Some(File.createTempFile("truststore", ".jks"))
+
+  val topicResource = new Resource(Topic, topic)
+  val groupResource = new Resource(Group, group)
+  val clusterResource = Resource.ClusterResource
+
+  // Arguments to AclCommand to set ACLs. There are three definitions here:
+  // 1- Provides read and write access to topic
+  // 2- Provides only write access to topic
+  // 3- Provides read access to consumer group
+  def clusterAclArgs: Array[String] = Array("--authorizer-properties",
+                                            s"zookeeper.connect=$zkConnect",
+                                            s"--add",
+                                            s"--cluster",
+                                            s"--operation=ClusterAction",
+                                            
s"--allow-principal=$kafkaPrincipalType:$kafkaPrincipal")
+  def produceAclArgs: Array[String] = Array("--authorizer-properties",
+                                          s"zookeeper.connect=$zkConnect",
+                                          s"--add",
+                                          s"--topic=$topic",
+                                          s"--producer",
+                                          
s"--allow-principal=$kafkaPrincipalType:$clientPrincipal")
+  def consumeAclArgs: Array[String] = Array("--authorizer-properties",
+                                               s"zookeeper.connect=$zkConnect",
+                                               s"--add",
+                                               s"--topic=$topic",
+                                               s"--group=$group",
+                                               s"--consumer",
+                                               
s"--allow-principal=$kafkaPrincipalType:$clientPrincipal")
+  def groupAclArgs: Array[String] = Array("--authorizer-properties", 
+                                          s"zookeeper.connect=$zkConnect",
+                                          s"--add",
+                                          s"--group=$group",
+                                          s"--operation=Read",
+                                          
s"--allow-principal=$kafkaPrincipalType:$clientPrincipal")
+  def ClusterActionAcl:Set[Acl] =  Set(new Acl(new 
KafkaPrincipal(kafkaPrincipalType, kafkaPrincipal), Allow, Acl.WildCardHost, 
ClusterAction))
+  def GroupReadAcl = Set(new Acl(new KafkaPrincipal(kafkaPrincipalType, 
clientPrincipal), Allow, Acl.WildCardHost, Read))
+  def TopicReadAcl = Set(new Acl(new KafkaPrincipal(kafkaPrincipalType, 
clientPrincipal), Allow, Acl.WildCardHost, Read))
+  def TopicWriteAcl = Set(new Acl(new KafkaPrincipal(kafkaPrincipalType, 
clientPrincipal), Allow, Acl.WildCardHost, Write))
+  def TopicDescribeAcl = Set(new Acl(new KafkaPrincipal(kafkaPrincipalType, 
clientPrincipal), Allow, Acl.WildCardHost, Describe))
+  // The next two configuration parameters enable ZooKeeper secure ACLs
+  // and sets the Kafka authorizer, both necessary to enable security.
+  this.serverConfig.setProperty(KafkaConfig.ZkEnableSecureAclsProp, "true")
+  this.serverConfig.setProperty(KafkaConfig.AuthorizerClassNameProp, 
classOf[SimpleAclAuthorizer].getName)
+  // Some needed configuration for brokers, producers, and consumers
+  this.serverConfig.setProperty(KafkaConfig.OffsetsTopicPartitionsProp, "1")
+  this.serverConfig.setProperty(KafkaConfig.OffsetsTopicReplicationFactorProp, 
"1")
+  this.producerConfig.setProperty(ProducerConfig.ACKS_CONFIG, "1")
+  this.consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "group")
+
+  /**
+    * Starts MiniKDC and only then sets up the parent trait.
+    */
+  @Before
+  override def setUp {
+    securityProtocol match {
+      case SecurityProtocol.SSL =>
+        startSasl(ZkSasl)
+      case _ =>
+        startSasl(Both)
+    }
+    super.setUp
+    // create the test topic with all the brokers as replicas
+    TestUtils.createTopic(zkUtils, topic, 1, 1, this.servers)
+  }
+
+  /**
+    * Closes MiniKDC last when tearing down.
+    */
+  @After
+  override def tearDown {
+    super.tearDown
+    closeSasl()
+  }
+
+  /**
+    * Tests the ability of producing and consuming with the appropriate ACLs 
set.
+    */
+  @Test
+  def testProduceConsume {
+    AclCommand.main(produceAclArgs)
+    AclCommand.main(consumeAclArgs)
+    TestUtils.waitAndVerifyAcls(TopicReadAcl ++ TopicWriteAcl ++ 
TopicDescribeAcl, servers.head.apis.authorizer.get, topicResource)
+    TestUtils.waitAndVerifyAcls(GroupReadAcl, 
servers.head.apis.authorizer.get, groupResource)
+    //Produce records
+    debug("Starting to send records")
+    sendRecords(numRecords, tp)
+    //Consume records
+    debug("Finished sending and starting to consume records")
+    consumers.head.assign(List(tp).asJava)
+    consumeRecords(this.consumers.head)
+    debug("Finished consuming")
+  }
+
+  /**
+    * Tests that a producer fails to publish messages when the appropriate ACL
+    * isn't set.
+    */
+  @Test
+  def testNoProduceAcl {
+    //Produce records
+    debug("Starting to send records")
+    try{
+      sendRecords(numRecords, tp)
+      fail("Topic authorization exception expected")
+    } catch {
+      case e: TopicAuthorizationException => //expected
+    }
+  }
+
+  /**
+    * Tests that a consumer fails to consume messages without the appropriate
+    * ACL set.
+    */
+  @Test
+  def testNoConsumeAcl {
+    AclCommand.main(produceAclArgs)
+    AclCommand.main(groupAclArgs)
+    TestUtils.waitAndVerifyAcls(TopicWriteAcl ++ TopicDescribeAcl, 
servers.head.apis.authorizer.get, topicResource)
+    TestUtils.waitAndVerifyAcls(GroupReadAcl, 
servers.head.apis.authorizer.get, groupResource)
+    //Produce records
+    debug("Starting to send records")
+    sendRecords(numRecords, tp)
+    //Consume records
+    debug("Finished sending and starting to consume records")
+    consumers.head.assign(List(tp).asJava)
+    try{
+      consumeRecords(this.consumers.head)
+      fail("Topic authorization exception expected")
+    } catch {
+      case e: TopicAuthorizationException => //expected
+    }
+  }
+
+  /**
+    * Tests that a consumer fails to consume messages without the appropriate
+    * ACL set.
+    */
+  @Test
+  def testNoGroupAcl {
+    AclCommand.main(produceAclArgs)
+    TestUtils.waitAndVerifyAcls(TopicWriteAcl ++ TopicDescribeAcl, 
servers.head.apis.authorizer.get, topicResource)
+    //Produce records
+    debug("Starting to send records")
+    sendRecords(numRecords, tp)
+    //Consume records
+    debug("Finished sending and starting to consume records")
+    consumers.head.assign(List(tp).asJava)
+    try{
+      consumeRecords(this.consumers.head)
+      fail("Topic authorization exception expected")
+    } catch {
+      case e: GroupAuthorizationException => //expected
+    }
+  }
+  
+  private def sendRecords(numRecords: Int, tp: TopicPartition) {
+    val futures = (0 until numRecords).map { i =>
+      val record = new ProducerRecord(tp.topic(), tp.partition(), 
s"$i".getBytes, s"$i".getBytes)
+      debug(s"Sending this record: $record")
+      this.producers.head.send(record)
+    }
+    try {
+      futures.foreach(_.get)
+    } catch {
+      case e: ExecutionException => throw e.getCause
+    }
+  }
+
+  private def consumeRecords(consumer: Consumer[Array[Byte], Array[Byte]],
+                             numRecords: Int = 1,
+                             startingOffset: Int = 0,
+                             topic: String = topic,
+                             part: Int = part) {
+    val records = new ArrayList[ConsumerRecord[Array[Byte], Array[Byte]]]()
+    val maxIters = numRecords * 50
+    var iters = 0
+    while (records.size < numRecords) {
+      for (record <- consumer.poll(50).asScala) {
+        records.add(record)
+      }
+      if (iters > maxIters)
+        throw new IllegalStateException("Failed to consume the expected 
records after " + iters + " iterations.")
+      iters += 1
+    }
+    for (i <- 0 until numRecords) {
+      val record = records.get(i)
+      val offset = startingOffset + i
+      assertEquals(topic, record.topic())
+      assertEquals(part, record.partition())
+      assertEquals(offset.toLong, record.offset())
+    } 
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/4a8acdf9/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala 
b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
index b7ecc9d..7aaa185 100644
--- a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
+++ b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
@@ -53,19 +53,26 @@ trait IntegrationTestHarness extends KafkaServerTestHarness 
{
 
   @Before
   override def setUp() {
+    val producerSecurityProps = 
TestUtils.producerSecurityConfigs(securityProtocol, trustStoreFile)
+    val consumerSecurityProps = 
TestUtils.consumerSecurityConfigs(securityProtocol, trustStoreFile)
     super.setUp()
-    producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
     producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
classOf[org.apache.kafka.common.serialization.ByteArraySerializer])
     producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
classOf[org.apache.kafka.common.serialization.ByteArraySerializer])
-    producerConfig.putAll(TestUtils.producerSecurityConfigs(securityProtocol, 
trustStoreFile))
-    consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
+    producerConfig.putAll(producerSecurityProps)
     consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
classOf[org.apache.kafka.common.serialization.ByteArrayDeserializer])
     consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
classOf[org.apache.kafka.common.serialization.ByteArrayDeserializer])
-    consumerConfig.putAll(TestUtils.consumerSecurityConfigs(securityProtocol, 
trustStoreFile))
+    consumerConfig.putAll(consumerSecurityProps)
     for (i <- 0 until producerCount)
-      producers += new KafkaProducer(producerConfig)
+      producers += TestUtils.createNewProducer(brokerList, 
+                                               acks = 1,
+                                               securityProtocol = 
this.securityProtocol,
+                                               trustStoreFile = 
this.trustStoreFile,
+                                               props = Some(producerConfig))
     for (i <- 0 until consumerCount) {
-      consumers += new KafkaConsumer(consumerConfig)
+      consumers += TestUtils.createNewConsumer(brokerList,
+                                               securityProtocol = 
this.securityProtocol,
+                                               trustStoreFile = 
this.trustStoreFile,
+                                               props = Some(consumerConfig))
     }
 
     // create the consumer offset topic

http://git-wip-us.apache.org/repos/asf/kafka/blob/4a8acdf9/core/src/test/scala/integration/kafka/api/SaslPlaintextConsumerTest.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/integration/kafka/api/SaslPlaintextConsumerTest.scala 
b/core/src/test/scala/integration/kafka/api/SaslPlaintextConsumerTest.scala
index e6f0c2b..0d11dc1 100644
--- a/core/src/test/scala/integration/kafka/api/SaslPlaintextConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/SaslPlaintextConsumerTest.scala
@@ -15,5 +15,6 @@ package kafka.api
 import org.apache.kafka.common.protocol.SecurityProtocol
 
 class SaslPlaintextConsumerTest extends BaseConsumerTest with SaslTestHarness {
+  override protected val zkSaslEnabled = false
   override protected def securityProtocol = SecurityProtocol.SASL_PLAINTEXT
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/4a8acdf9/core/src/test/scala/integration/kafka/api/SaslSetup.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/SaslSetup.scala 
b/core/src/test/scala/integration/kafka/api/SaslSetup.scala
new file mode 100644
index 0000000..d49b5bd
--- /dev/null
+++ b/core/src/test/scala/integration/kafka/api/SaslSetup.scala
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * 
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.api
+
+import java.io.{File}
+import javax.security.auth.login.Configuration
+
+import kafka.utils.{JaasTestUtils,TestUtils}
+import org.apache.hadoop.minikdc.MiniKdc
+import org.apache.kafka.common.security.JaasUtils
+import org.apache.kafka.common.security.kerberos.LoginManager
+
+/*
+ * Implements an enumeration for the modes enabled here:
+ * zk only, kafka only, both.
+ */
+sealed trait SaslSetupMode
+case object ZkSasl extends SaslSetupMode
+case object KafkaSasl extends SaslSetupMode
+case object Both extends SaslSetupMode
+
+/*
+ * Trait used in SaslTestHarness and EndToEndAuthorizationTest
+ * currently to setup a keytab and jaas files.
+ */
+trait SaslSetup {
+  private val workDir = new File(System.getProperty("test.dir", "target"))
+  private val kdcConf = MiniKdc.createConf()
+  private val kdc = new MiniKdc(kdcConf, workDir)
+
+  def startSasl(mode: SaslSetupMode = Both) {
+    // Important if tests leak consumers, producers or brokers
+    LoginManager.closeAll()
+    val keytabFile = createKeytabAndSetConfiguration(mode)
+    kdc.start()
+    kdc.createPrincipal(keytabFile, "client", "kafka/localhost")
+    if (mode == Both || mode == ZkSasl)
+      System.setProperty("zookeeper.authProvider.1", 
"org.apache.zookeeper.server.auth.SASLAuthenticationProvider")
+  }
+
+  protected def createKeytabAndSetConfiguration(mode: SaslSetupMode): File = {
+    val (keytabFile, jaasFile) = createKeytabAndJaasFiles(mode)
+    // This will cause a reload of the Configuration singleton when 
`getConfiguration` is called
+    Configuration.setConfiguration(null)
+    System.setProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM, 
jaasFile.getAbsolutePath)
+    keytabFile
+  }
+
+  private def createKeytabAndJaasFiles(mode: SaslSetupMode): (File, File) = {
+    val keytabFile = TestUtils.tempFile()
+    val jaasFileName: String = mode match {
+      case ZkSasl =>
+        JaasTestUtils.genZkFile
+      case KafkaSasl =>
+        JaasTestUtils.genKafkaFile(keytabFile.getAbsolutePath)
+      case _ =>
+        JaasTestUtils.genZkAndKafkaFile(keytabFile.getAbsolutePath)
+    }
+    val jaasFile = new File(jaasFileName)
+
+    (keytabFile, jaasFile)
+  }
+
+  def closeSasl() {
+    kdc.stop()
+    // Important if tests leak consumers, producers or brokers
+    LoginManager.closeAll()
+    System.clearProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM)
+    System.clearProperty("zookeeper.authProvider.1");
+    Configuration.setConfiguration(null)
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/4a8acdf9/core/src/test/scala/integration/kafka/api/SaslSslConsumerTest.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/integration/kafka/api/SaslSslConsumerTest.scala 
b/core/src/test/scala/integration/kafka/api/SaslSslConsumerTest.scala
index 4f8512a..388ddaf 100644
--- a/core/src/test/scala/integration/kafka/api/SaslSslConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/SaslSslConsumerTest.scala
@@ -15,8 +15,12 @@ package kafka.api
 import java.io.File
 
 import org.apache.kafka.common.protocol.SecurityProtocol
+import kafka.server.KafkaConfig
 
 class SaslSslConsumerTest extends BaseConsumerTest with SaslTestHarness {
+  override protected val zkSaslEnabled = true
+  this.serverConfig.setProperty(KafkaConfig.ZkEnableSecureAclsProp, "true")
   override protected def securityProtocol = SecurityProtocol.SASL_SSL
   override protected lazy val trustStoreFile = 
Some(File.createTempFile("truststore", ".jks"))
+  
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/4a8acdf9/core/src/test/scala/integration/kafka/api/SaslSslEndToEndAuthorizationTest.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/integration/kafka/api/SaslSslEndToEndAuthorizationTest.scala
 
b/core/src/test/scala/integration/kafka/api/SaslSslEndToEndAuthorizationTest.scala
new file mode 100644
index 0000000..0ca0904
--- /dev/null
+++ 
b/core/src/test/scala/integration/kafka/api/SaslSslEndToEndAuthorizationTest.scala
@@ -0,0 +1,25 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  * http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+package kafka.api
+
+import org.apache.kafka.common.protocol.SecurityProtocol
+
+class SaslSslEndToEndAuthorizationTest extends EndToEndAuthorizationTest {
+  override protected def securityProtocol = SecurityProtocol.SASL_SSL
+  override val clientPrincipal = "client"
+  override val kafkaPrincipal = "kafka"
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/4a8acdf9/core/src/test/scala/integration/kafka/api/SaslTestHarness.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/SaslTestHarness.scala 
b/core/src/test/scala/integration/kafka/api/SaslTestHarness.scala
index 14ceb43..b4ae74f 100644
--- a/core/src/test/scala/integration/kafka/api/SaslTestHarness.scala
+++ b/core/src/test/scala/integration/kafka/api/SaslTestHarness.scala
@@ -12,70 +12,25 @@
   */
 package kafka.api
 
-import java.io.{FileWriter, BufferedWriter, File}
-import javax.security.auth.login.Configuration
-
-import kafka.utils.TestUtils
 import kafka.zk.ZooKeeperTestHarness
-import org.apache.hadoop.minikdc.MiniKdc
-import org.apache.kafka.common.security.JaasUtils
-import org.apache.kafka.common.security.kerberos.LoginManager
 import org.junit.{After, Before}
 
-trait SaslTestHarness extends ZooKeeperTestHarness {
-
-  private val workDir = new File(System.getProperty("test.dir", "target"))
-  private val kdcConf = MiniKdc.createConf()
-  private val kdc = new MiniKdc(kdcConf, workDir)
+trait SaslTestHarness extends ZooKeeperTestHarness with SaslSetup {
+  protected val zkSaslEnabled: Boolean
 
   @Before
   override def setUp() {
-    // Important if tests leak consumers, producers or brokers
-    LoginManager.closeAll()
-    val keytabFile = createKeytabAndSetConfiguration("kafka_jaas.conf")
-    kdc.start()
-    kdc.createPrincipal(keytabFile, "client", "kafka/localhost")
+    if (zkSaslEnabled)
+      startSasl(Both)
+    else
+      startSasl(KafkaSasl)
     super.setUp
   }
 
-  protected def createKeytabAndSetConfiguration(jaasResourceName: String): 
File = {
-    val (keytabFile, jaasFile) = createKeytabAndJaasFiles(jaasResourceName)
-    // This will cause a reload of the Configuration singleton when 
`getConfiguration` is called
-    Configuration.setConfiguration(null)
-    System.setProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM, 
jaasFile.getAbsolutePath)
-    keytabFile
-  }
-
-  private def createKeytabAndJaasFiles(jaasResourceName: String): (File, File) 
= {
-    val keytabFile = TestUtils.tempFile()
-    val jaasFile = TestUtils.tempFile()
-
-    val writer = new BufferedWriter(new FileWriter(jaasFile))
-    val inputStream = 
Thread.currentThread().getContextClassLoader.getResourceAsStream(jaasResourceName)
-    if (inputStream == null)
-      throw new IllegalStateException(s"Could not find `$jaasResourceName`, 
make sure it is in the classpath")
-    val source = io.Source.fromInputStream(inputStream, "UTF-8")
-
-    for (line <- source.getLines) {
-      val replaced = line.replaceAll("\\$keytab-location", 
keytabFile.getAbsolutePath)
-      writer.write(replaced)
-      writer.newLine()
-    }
-
-    writer.close()
-    source.close()
-
-    (keytabFile, jaasFile)
-  }
-
   @After
   override def tearDown() {
     super.tearDown
-    kdc.stop()
-    // Important if tests leak consumers, producers or brokers
-    LoginManager.closeAll()
-    System.clearProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM)
-    Configuration.setConfiguration(null)
+    closeSasl()
   }
 
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/4a8acdf9/core/src/test/scala/integration/kafka/api/SslEndToEndAuthorizationTest.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/integration/kafka/api/SslEndToEndAuthorizationTest.scala 
b/core/src/test/scala/integration/kafka/api/SslEndToEndAuthorizationTest.scala
new file mode 100644
index 0000000..15e8527
--- /dev/null
+++ 
b/core/src/test/scala/integration/kafka/api/SslEndToEndAuthorizationTest.scala
@@ -0,0 +1,29 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *    http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package kafka.api
+
+import org.apache.kafka.common.config.SslConfigs
+import org.apache.kafka.common.protocol.SecurityProtocol
+
+
+class SslEndToEndAuthorizationTest extends EndToEndAuthorizationTest {
+  override protected def securityProtocol = SecurityProtocol.SSL
+  this.serverConfig.setProperty(SslConfigs.SSL_CLIENT_AUTH_CONFIG, "required")
+  override val clientPrincipal = "O=client,CN=localhost"
+  override val kafkaPrincipal = "O=server,CN=localhost"
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/4a8acdf9/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala 
b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
index e9784e0..870b9ad 100755
--- a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
+++ b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
@@ -25,6 +25,7 @@ import kafka.server._
 import kafka.utils.{CoreUtils, TestUtils}
 import kafka.zk.ZooKeeperTestHarness
 import org.apache.kafka.common.protocol.SecurityProtocol
+import org.apache.kafka.common.security.auth.KafkaPrincipal
 import org.junit.{After, Before}
 
 import scala.collection.mutable.Buffer
@@ -37,6 +38,8 @@ trait KafkaServerTestHarness extends ZooKeeperTestHarness {
   var servers: Buffer[KafkaServer] = null
   var brokerList: String = null
   var alive: Array[Boolean] = null
+  val kafkaPrincipalType = KafkaPrincipal.USER_TYPE
+  val setClusterAcl: Option[() => Unit] = None
 
   /**
    * Implementations must override this method to return a set of 
KafkaConfigs. This method will be invoked for every
@@ -58,12 +61,27 @@ trait KafkaServerTestHarness extends ZooKeeperTestHarness {
   @Before
   override def setUp() {
     super.setUp
-    if(configs.size <= 0)
+    if (configs.size <= 0)
       throw new KafkaException("Must supply at least one server config.")
     servers = configs.map(TestUtils.createServer(_)).toBuffer
     brokerList = TestUtils.getBrokerListStrFromServers(servers, 
securityProtocol)
     alive = new Array[Boolean](servers.length)
     Arrays.fill(alive, true)
+    // We need to set a cluster ACL in some cases here
+    // because of the topic creation in the setup of
+    // IntegrationTestHarness. If we don't, then tests
+    // fail with a cluster action authorization exception
+    // when processing an update metadata request
+    // (controller -> broker).
+    //
+    // The following method does nothing by default, but
+    // if the test case requires setting up a cluster ACL,
+    // then it needs to be implemented.
+    setClusterAcl match {
+      case Some(f) =>
+        f()
+      case None => // Nothing to do
+    }
   }
 
   @After

http://git-wip-us.apache.org/repos/asf/kafka/blob/4a8acdf9/core/src/test/scala/unit/kafka/integration/SaslPlaintextTopicMetadataTest.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/unit/kafka/integration/SaslPlaintextTopicMetadataTest.scala
 
b/core/src/test/scala/unit/kafka/integration/SaslPlaintextTopicMetadataTest.scala
index 11d6da4..888207c 100644
--- 
a/core/src/test/scala/unit/kafka/integration/SaslPlaintextTopicMetadataTest.scala
+++ 
b/core/src/test/scala/unit/kafka/integration/SaslPlaintextTopicMetadataTest.scala
@@ -21,6 +21,7 @@ import kafka.api.SaslTestHarness
 import org.apache.kafka.common.protocol.SecurityProtocol
 
 class SaslPlaintextTopicMetadataTest extends BaseTopicMetadataTest with 
SaslTestHarness {
+  override protected val zkSaslEnabled = false
   protected def securityProtocol = SecurityProtocol.SASL_PLAINTEXT
   protected def trustStoreFile = None
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/4a8acdf9/core/src/test/scala/unit/kafka/integration/SaslSslTopicMetadataTest.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/unit/kafka/integration/SaslSslTopicMetadataTest.scala 
b/core/src/test/scala/unit/kafka/integration/SaslSslTopicMetadataTest.scala
index ea15419..f313280 100644
--- a/core/src/test/scala/unit/kafka/integration/SaslSslTopicMetadataTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/SaslSslTopicMetadataTest.scala
@@ -23,6 +23,7 @@ import kafka.api.SaslTestHarness
 import org.apache.kafka.common.protocol.SecurityProtocol
 
 class SaslSslTopicMetadataTest extends BaseTopicMetadataTest with 
SaslTestHarness {
+  override protected val zkSaslEnabled = false
   protected def securityProtocol = SecurityProtocol.SASL_SSL
   protected lazy val trustStoreFile = Some(File.createTempFile("truststore", 
".jks"))
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/4a8acdf9/core/src/test/scala/unit/kafka/server/SaslPlaintextReplicaFetchTest.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/unit/kafka/server/SaslPlaintextReplicaFetchTest.scala 
b/core/src/test/scala/unit/kafka/server/SaslPlaintextReplicaFetchTest.scala
index 740db37..435a0f0 100644
--- a/core/src/test/scala/unit/kafka/server/SaslPlaintextReplicaFetchTest.scala
+++ b/core/src/test/scala/unit/kafka/server/SaslPlaintextReplicaFetchTest.scala
@@ -21,6 +21,7 @@ import kafka.api.SaslTestHarness
 import org.apache.kafka.common.protocol.SecurityProtocol
 
 class SaslPlaintextReplicaFetchTest extends BaseReplicaFetchTest with 
SaslTestHarness {
+  override protected val zkSaslEnabled = false
   protected def securityProtocol = SecurityProtocol.SASL_PLAINTEXT
   protected def trustStoreFile = None
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/4a8acdf9/core/src/test/scala/unit/kafka/server/SaslSslReplicaFetchTest.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/unit/kafka/server/SaslSslReplicaFetchTest.scala 
b/core/src/test/scala/unit/kafka/server/SaslSslReplicaFetchTest.scala
index 1bcf8ac..3af8485 100644
--- a/core/src/test/scala/unit/kafka/server/SaslSslReplicaFetchTest.scala
+++ b/core/src/test/scala/unit/kafka/server/SaslSslReplicaFetchTest.scala
@@ -23,6 +23,7 @@ import kafka.api.SaslTestHarness
 import org.apache.kafka.common.protocol.SecurityProtocol
 
 class SaslSslReplicaFetchTest extends BaseReplicaFetchTest with 
SaslTestHarness {
+  override protected val zkSaslEnabled = false
   protected def securityProtocol = SecurityProtocol.SASL_SSL
   protected lazy val trustStoreFile = Some(File.createTempFile("truststore", 
".jks"))
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/4a8acdf9/core/src/test/scala/unit/kafka/utils/JaasTestUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/JaasTestUtils.scala 
b/core/src/test/scala/unit/kafka/utils/JaasTestUtils.scala
index 78a74d5..cf08830 100644
--- a/core/src/test/scala/unit/kafka/utils/JaasTestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/JaasTestUtils.scala
@@ -18,23 +18,70 @@ package kafka.utils
 
 
 object JaasTestUtils {
-  val serverContextName = "Server"
+  // ZooKeeper vals
+  val zkServerContextName = "Server"
+  val zkClientContextName = "Client"
   val userSuperPasswd = "adminpasswd"
   val user = "fpj"
   val userPasswd = "fpjsecret"
-  val module = "org.apache.zookeeper.server.auth.DigestLoginModule"
-
+  val zkModule = "org.apache.zookeeper.server.auth.DigestLoginModule"
+  //Kafka vals
+  val kafkaServerContextName = "KafkaServer"
+  val kafkaClientContextName = "KafkaClient"
+  val kafkaServerPrincipal = "cli...@example.com"
+  val kafkaClientPrincipal = "kafka/localh...@example.com"
+  val kafkaModule = "com.sun.security.auth.module.Krb5LoginModule"
+  
   def genZkFile: String = {
-    val jaasFile = java.io.File.createTempFile("jaas", "conf")
+    val jaasFile = java.io.File.createTempFile("jaas", ".conf")
+    val jaasOutputStream = new java.io.FileOutputStream(jaasFile)
+    writeZkToOutputStream(jaasOutputStream)
+    jaasOutputStream.close()
+    jaasFile.deleteOnExit()
+    jaasFile.getCanonicalPath
+  }
+  
+  def genKafkaFile(keytabLocation: String): String = {
+    val jaasFile = java.io.File.createTempFile("jaas", ".conf")
     val jaasOutputStream = new java.io.FileOutputStream(jaasFile)
-    jaasOutputStream.write(s"Server {\n\t$module required\n".getBytes)
+    writeKafkaToOutputStream(jaasOutputStream, keytabLocation)
+    jaasOutputStream.close()
+    jaasFile.deleteOnExit()
+    jaasFile.getCanonicalPath
+  }
+  
+  def genZkAndKafkaFile(keytabLocation: String): String = {
+    val jaasFile = java.io.File.createTempFile("jaas", ".conf")
+    val jaasOutputStream = new java.io.FileOutputStream(jaasFile)
+    writeKafkaToOutputStream(jaasOutputStream, keytabLocation)
+    jaasOutputStream.write("\n\n".getBytes)
+    writeZkToOutputStream(jaasOutputStream)
+    jaasOutputStream.close()
+    jaasFile.deleteOnExit()
+    jaasFile.getCanonicalPath
+  }
+  
+  private def writeZkToOutputStream(jaasOutputStream: 
java.io.FileOutputStream) {
+    jaasOutputStream.write(s"$zkServerContextName {\n\t$zkModule 
required\n".getBytes)
     jaasOutputStream.write(s"""\tuser_super="$userSuperPasswd"\n""".getBytes)
     jaasOutputStream.write(s"""\tuser_$user="$userPasswd";\n};\n\n""".getBytes)
-    jaasOutputStream.write(s"""Client {\n\t$module required\n""".getBytes)
+    jaasOutputStream.write(s"""$zkClientContextName {\n\t$zkModule 
required\n""".getBytes)
     jaasOutputStream.write(s"""\tusername="$user"\n""".getBytes)
     jaasOutputStream.write(s"""\tpassword="$userPasswd";\n};""".getBytes)
-    jaasOutputStream.close()
-    jaasFile.deleteOnExit()
-    jaasFile.getCanonicalPath
+  }
+  
+  private def writeKafkaToOutputStream(jaasOutputStream: 
java.io.FileOutputStream, keytabLocation: String) {
+    jaasOutputStream.write(s"$kafkaClientContextName {\n\t$kafkaModule 
required debug=true\n".getBytes)
+    jaasOutputStream.write(s"\tuseKeyTab=true\n".getBytes)
+    jaasOutputStream.write(s"\tstoreKey=true\n".getBytes)
+    jaasOutputStream.write(s"""\tserviceName="kafka"\n""".getBytes)
+    jaasOutputStream.write(s"""\tkeyTab="$keytabLocation"\n""".getBytes)
+    
jaasOutputStream.write(s"""\tprincipal="$kafkaServerPrincipal";\n};\n\n""".getBytes)
+    jaasOutputStream.write(s"""$kafkaServerContextName {\n\t$kafkaModule 
required debug=true\n""".getBytes)
+    jaasOutputStream.write(s"\tuseKeyTab=true\n".getBytes)
+    jaasOutputStream.write(s"\tstoreKey=true\n".getBytes)
+    jaasOutputStream.write(s"""\tserviceName="kafka"\n""".getBytes)
+    jaasOutputStream.write(s"""\tkeyTab="$keytabLocation"\n""".getBytes)
+    
jaasOutputStream.write(s"""\tprincipal="$kafkaClientPrincipal";\n};""".getBytes)
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/4a8acdf9/core/src/test/scala/unit/kafka/utils/TestUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala 
b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 88c91f4..7c928c5 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -29,13 +29,11 @@ import charset.Charset
 
 import kafka.security.auth.{Resource, Authorizer, Acl}
 import org.apache.kafka.common.protocol.SecurityProtocol
-import org.apache.kafka.common.security.ssl.SslFactory
 import org.apache.kafka.common.utils.Utils._
 import org.apache.kafka.test.TestSslUtils
 
 import scala.collection.mutable.{ArrayBuffer, ListBuffer}
 
-import org.I0Itec.zkclient.{ZkClient, ZkConnection}
 import kafka.server._
 import kafka.producer._
 import kafka.message._
@@ -217,7 +215,7 @@ object TestUtils extends Logging {
     props.put("controlled.shutdown.retry.backoff.ms", "100")
 
     if (protocolAndPorts.exists { case (protocol, _) => 
usesSslTransportLayer(protocol) })
-      props.putAll(sslConfigs(Mode.SERVER, true, trustStoreFile, 
s"server$nodeId"))
+      props.putAll(sslConfigs(Mode.SERVER, false, trustStoreFile, 
s"server$nodeId"))
 
     interBrokerSecurityProtocol.foreach { protocol =>
       props.put(KafkaConfig.InterBrokerSecurityProtocolProp, protocol.name)
@@ -444,7 +442,7 @@ object TestUtils extends Logging {
                               certAlias: String): Properties = {
     val props = new Properties
     if (usesSslTransportLayer(securityProtocol))
-      props.putAll(sslConfigs(mode, false, trustStoreFile, certAlias))
+      props.putAll(sslConfigs(mode, true, trustStoreFile, certAlias))
     props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, 
securityProtocol.name)
     props
   }
@@ -473,7 +471,7 @@ object TestUtils extends Logging {
     producerProps.put(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG, 
metadataFetchTimeout.toString)
     producerProps.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferSize.toString)
     producerProps.put(ProducerConfig.RETRIES_CONFIG, retries.toString)
-
+        
     /* Only use these if not already set */
     val defaultProps = Map(
       ProducerConfig.RETRY_BACKOFF_MS_CONFIG -> "100",
@@ -486,7 +484,14 @@ object TestUtils extends Logging {
       if (!producerProps.containsKey(key)) producerProps.put(key, value)
     }
 
-    producerProps.putAll(producerSecurityConfigs(securityProtocol, 
trustStoreFile))
+    /*
+     * It uses CommonClientConfigs.SECURITY_PROTOCOL_CONFIG to determine 
whether
+     * securityConfigs has been invoked already. For example, we need to
+     * invoke it before this call in IntegrationTestHarness, otherwise the
+     * SSL client auth fails.
+     */
+    
if(!producerProps.containsKey(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG))
+      producerProps.putAll(producerSecurityConfigs(securityProtocol, 
trustStoreFile))
 
     new KafkaProducer[Array[Byte],Array[Byte]](producerProps)
   }
@@ -503,29 +508,42 @@ object TestUtils extends Logging {
    * Create a new consumer with a few pre-configured properties.
    */
   def createNewConsumer(brokerList: String,
-                        groupId: String,
+                        groupId: String = "group",
                         autoOffsetReset: String = "earliest",
                         partitionFetchSize: Long = 4096L,
                         partitionAssignmentStrategy: String = 
classOf[RangeAssignor].getName,
                         sessionTimeout: Int = 30000,
                         securityProtocol: SecurityProtocol,
-                        trustStoreFile: Option[File] = None) : 
KafkaConsumer[Array[Byte],Array[Byte]] = {
+                        trustStoreFile: Option[File] = None,
+                        props: Option[Properties] = None) : 
KafkaConsumer[Array[Byte],Array[Byte]] = {
     import org.apache.kafka.clients.consumer.ConsumerConfig
 
-    val consumerProps= new Properties()
+    val consumerProps = props.getOrElse(new Properties())
     consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
-    consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId)
     consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset)
     consumerProps.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 
partitionFetchSize.toString)
 
-    consumerProps.put(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG, "100")
-    consumerProps.put(ConsumerConfig.RECONNECT_BACKOFF_MS_CONFIG, "200")
-    consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.ByteArrayDeserializer")
-    consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.ByteArrayDeserializer")
-    consumerProps.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, 
partitionAssignmentStrategy)
-    consumerProps.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 
sessionTimeout.toString)
+    val defaultProps = Map(
+      ConsumerConfig.RETRY_BACKOFF_MS_CONFIG -> "100",
+      ConsumerConfig.RECONNECT_BACKOFF_MS_CONFIG -> "200",
+      ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> 
"org.apache.kafka.common.serialization.ByteArrayDeserializer",
+      ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> 
"org.apache.kafka.common.serialization.ByteArrayDeserializer",
+      ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG -> 
partitionAssignmentStrategy,
+      ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG -> sessionTimeout.toString,
+      ConsumerConfig.GROUP_ID_CONFIG -> groupId)
 
-    consumerProps.putAll(consumerSecurityConfigs(securityProtocol, 
trustStoreFile))
+    defaultProps.foreach { case (key, value) =>
+      if (!consumerProps.containsKey(key)) consumerProps.put(key, value)
+    }
+
+    /*
+     * It uses CommonClientConfigs.SECURITY_PROTOCOL_CONFIG to determine 
whether
+     * securityConfigs has been invoked already. For example, we need to
+     * invoke it before this call in IntegrationTestHarness, otherwise the
+     * SSL client auth fails.
+     */
+    
if(!consumerProps.containsKey(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG))
+      consumerProps.putAll(consumerSecurityConfigs(securityProtocol, 
trustStoreFile))
 
     new KafkaConsumer[Array[Byte],Array[Byte]](consumerProps)
   }
@@ -990,12 +1008,7 @@ object TestUtils extends Logging {
     }
 
 
-    val sslConfigs = {
-      if (mode == Mode.SERVER)
-        TestSslUtils.createSslConfig(true, true, mode, trustStore, certAlias)
-      else
-        TestSslUtils.createSslConfig(clientCert, false, mode, trustStore, 
certAlias)
-    }
+    val sslConfigs = TestSslUtils.createSslConfig(clientCert, true, mode, 
trustStore, certAlias)
 
     val sslProps = new Properties()
     sslConfigs.foreach { case (k, v) => sslProps.put(k, v) }

Reply via email to