Repository: kafka
Updated Branches:
  refs/heads/trunk cd427c9b9 -> 5b375d7bf


http://git-wip-us.apache.org/repos/asf/kafka/blob/5b375d7b/core/src/main/scala/kafka/server/KafkaServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala 
b/core/src/main/scala/kafka/server/KafkaServer.scala
index f998d82..36b52fd 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -323,6 +323,13 @@ class KafkaServer(val config: KafkaConfig, time: Time = 
SystemTime, threadNamePr
     def networkClientControlledShutdown(retries: Int): Boolean = {
       val metadataUpdater = new ManualMetadataUpdater()
       val networkClient = {
+        val channelBuilder = ChannelBuilders.create(
+          config.interBrokerSecurityProtocol,
+          Mode.CLIENT,
+          LoginType.SERVER,
+          config.values,
+          config.saslMechanismInterBrokerProtocol,
+          config.saslInterBrokerHandshakeRequestEnable)
         val selector = new Selector(
           NetworkReceive.UNLIMITED,
           config.connectionsMaxIdleMs,
@@ -331,7 +338,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = 
SystemTime, threadNamePr
           "kafka-server-controlled-shutdown",
           Map.empty.asJava,
           false,
-          ChannelBuilders.create(config.interBrokerSecurityProtocol, 
Mode.CLIENT, LoginType.SERVER, config.values)
+          channelBuilder
         )
         new NetworkClient(
           selector,

http://git-wip-us.apache.org/repos/asf/kafka/blob/5b375d7b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala 
b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
index 26838ca..84f2e12 100644
--- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
@@ -73,6 +73,14 @@ class ReplicaFetcherThread(name: String,
   // as the metrics tag to avoid metric name conflicts with
   // more than one fetcher thread to the same broker
   private val networkClient = {
+    val channelBuilder = ChannelBuilders.create(
+      brokerConfig.interBrokerSecurityProtocol,
+      Mode.CLIENT,
+      LoginType.SERVER,
+      brokerConfig.values,
+      brokerConfig.saslMechanismInterBrokerProtocol,
+      brokerConfig.saslInterBrokerHandshakeRequestEnable
+    )
     val selector = new Selector(
       NetworkReceive.UNLIMITED,
       brokerConfig.connectionsMaxIdleMs,
@@ -81,7 +89,7 @@ class ReplicaFetcherThread(name: String,
       "replica-fetcher",
       Map("broker-id" -> sourceBroker.id.toString, "fetcher-id" -> 
fetcherId.toString).asJava,
       false,
-      ChannelBuilders.create(brokerConfig.interBrokerSecurityProtocol, 
Mode.CLIENT, LoginType.SERVER, brokerConfig.values)
+      channelBuilder
     )
     new NetworkClient(
       selector,

http://git-wip-us.apache.org/repos/asf/kafka/blob/5b375d7b/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala 
b/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
index 56dae76..23fcfa6 100644
--- a/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
@@ -26,6 +26,7 @@ import org.junit.{Before, Test}
 import scala.collection.JavaConverters._
 import scala.collection.mutable.Buffer
 import org.apache.kafka.common.internals.TopicConstants
+import org.apache.kafka.clients.producer.KafkaProducer
 
 /**
  * Integration tests for the new consumer that cover basic usage as well as 
server failures
@@ -268,10 +269,14 @@ abstract class BaseConsumerTest extends 
IntegrationTestHarness with Logging {
   }
 
   protected def sendRecords(numRecords: Int, tp: TopicPartition) {
+    sendRecords(this.producers(0), numRecords, tp)
+  }
+
+  protected def sendRecords(producer: KafkaProducer[Array[Byte], Array[Byte]], 
numRecords: Int, tp: TopicPartition) {
     (0 until numRecords).foreach { i =>
-      this.producers(0).send(new ProducerRecord(tp.topic(), tp.partition(), 
i.toLong, s"key $i".getBytes, s"value $i".getBytes))
+      producer.send(new ProducerRecord(tp.topic(), tp.partition(), i.toLong, 
s"key $i".getBytes, s"value $i".getBytes))
     }
-    this.producers(0).flush()
+    producer.flush()
   }
 
   protected def consumeAndVerifyRecords(consumer: Consumer[Array[Byte], 
Array[Byte]],

http://git-wip-us.apache.org/repos/asf/kafka/blob/5b375d7b/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala 
b/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
index 49ce748..15eeb63 100644
--- a/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
+++ b/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
@@ -41,7 +41,7 @@ abstract class BaseProducerSendTest extends 
KafkaServerTestHarness {
     val numServers = 2
     overridingProps.put(KafkaConfig.NumPartitionsProp, 4.toString)
     TestUtils.createBrokerConfigs(numServers, zkConnect, false, 
interBrokerSecurityProtocol = Some(securityProtocol),
-      trustStoreFile = trustStoreFile).map(KafkaConfig.fromProps(_, 
overridingProps))
+      trustStoreFile = trustStoreFile, saslProperties = 
saslProperties).map(KafkaConfig.fromProps(_, overridingProps))
   }
 
   private var consumer1: SimpleConsumer = null
@@ -72,7 +72,7 @@ abstract class BaseProducerSendTest extends 
KafkaServerTestHarness {
 
   private def createProducer(brokerList: String, retries: Int = 0, lingerMs: 
Long = 0, props: Option[Properties] = None): 
KafkaProducer[Array[Byte],Array[Byte]] = {
     val producer = TestUtils.createNewProducer(brokerList, securityProtocol = 
securityProtocol, trustStoreFile = trustStoreFile,
-      retries = retries, lingerMs = lingerMs, props = props)
+      saslProperties = saslProperties, retries = retries, lingerMs = lingerMs, 
props = props)
     producers += producer
     producer
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/5b375d7b/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala 
b/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala
index e2314b3..870caca 100644
--- a/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala
@@ -141,9 +141,9 @@ trait EndToEndAuthorizationTest extends 
IntegrationTestHarness with SaslSetup {
   override def setUp {
     securityProtocol match {
       case SecurityProtocol.SSL =>
-        startSasl(ZkSasl)
+        startSasl(ZkSasl, null, null)
       case _ =>
-        startSasl(Both)
+        startSasl(Both, List("GSSAPI"), List("GSSAPI"))
     }
     super.setUp
     AclCommand.main(topicBrokerReadAclArgs)

http://git-wip-us.apache.org/repos/asf/kafka/blob/5b375d7b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala 
b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
index d0680b8..de05c9c 100644
--- a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
+++ b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
@@ -46,15 +46,15 @@ trait IntegrationTestHarness extends KafkaServerTestHarness 
{
 
   override def generateConfigs() = {
     val cfgs = TestUtils.createBrokerConfigs(serverCount, zkConnect, 
interBrokerSecurityProtocol = Some(securityProtocol),
-      trustStoreFile = trustStoreFile)
+      trustStoreFile = trustStoreFile, saslProperties = saslProperties)
     cfgs.foreach(_.putAll(serverConfig))
     cfgs.map(KafkaConfig.fromProps)
   }
 
   @Before
   override def setUp() {
-    val producerSecurityProps = 
TestUtils.producerSecurityConfigs(securityProtocol, trustStoreFile)
-    val consumerSecurityProps = 
TestUtils.consumerSecurityConfigs(securityProtocol, trustStoreFile)
+    val producerSecurityProps = 
TestUtils.producerSecurityConfigs(securityProtocol, trustStoreFile, 
saslProperties)
+    val consumerSecurityProps = 
TestUtils.consumerSecurityConfigs(securityProtocol, trustStoreFile, 
saslProperties)
     super.setUp()
     producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
classOf[org.apache.kafka.common.serialization.ByteArraySerializer])
     producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
classOf[org.apache.kafka.common.serialization.ByteArraySerializer])
@@ -66,11 +66,13 @@ trait IntegrationTestHarness extends KafkaServerTestHarness 
{
       producers += TestUtils.createNewProducer(brokerList,
                                                securityProtocol = 
this.securityProtocol,
                                                trustStoreFile = 
this.trustStoreFile,
+                                               saslProperties = 
this.saslProperties,
                                                props = Some(producerConfig))
     for (i <- 0 until consumerCount) {
       consumers += TestUtils.createNewConsumer(brokerList,
                                                securityProtocol = 
this.securityProtocol,
                                                trustStoreFile = 
this.trustStoreFile,
+                                               saslProperties = 
this.saslProperties,
                                                props = Some(consumerConfig))
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/5b375d7b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala 
b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
index 8dbb80b..b22ccde 100644
--- a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
@@ -355,7 +355,7 @@ class PlaintextConsumerTest extends BaseConsumerTest {
     producerProps.setProperty(ProducerConfig.COMPRESSION_TYPE_CONFIG, 
CompressionType.GZIP.name)
     producerProps.setProperty(ProducerConfig.LINGER_MS_CONFIG, 
Long.MaxValue.toString)
     val producer = TestUtils.createNewProducer(brokerList, securityProtocol = 
securityProtocol, trustStoreFile = trustStoreFile,
-        retries = 0, lingerMs = Long.MaxValue, props = Some(producerProps))
+        saslProperties = saslProperties, retries = 0, lingerMs = 
Long.MaxValue, props = Some(producerProps))
     (0 until numRecords).foreach { i =>
       producer.send(new ProducerRecord(tp.topic, tp.partition, i.toLong, s"key 
$i".getBytes, s"value $i".getBytes))
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/5b375d7b/core/src/test/scala/integration/kafka/api/SaslMultiMechanismConsumerTest.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/integration/kafka/api/SaslMultiMechanismConsumerTest.scala
 
b/core/src/test/scala/integration/kafka/api/SaslMultiMechanismConsumerTest.scala
new file mode 100644
index 0000000..d203245
--- /dev/null
+++ 
b/core/src/test/scala/integration/kafka/api/SaslMultiMechanismConsumerTest.scala
@@ -0,0 +1,86 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements. See the NOTICE
+  * file distributed with this work for additional information regarding 
copyright ownership. The ASF licenses this file
+  * to You under the Apache License, Version 2.0 (the "License"); you may not 
use this file except in compliance with the
+  * License. You may obtain a copy of the License at
+  *
+  * http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on
+  * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either 
express or implied. See the License for the
+  * specific language governing permissions and limitations under the License.
+  */
+package kafka.api
+
+import java.io.File
+import org.apache.kafka.common.protocol.SecurityProtocol
+import kafka.server.KafkaConfig
+import org.junit.Test
+import kafka.utils.TestUtils
+import scala.collection.JavaConverters._
+
+class SaslMultiMechanismConsumerTest extends BaseConsumerTest with 
SaslTestHarness {
+  override protected val zkSaslEnabled = true
+  override protected val kafkaClientSaslMechanism = "PLAIN"
+  override protected val kafkaServerSaslMechanisms = List("GSSAPI", "PLAIN")
+  override protected def allKafkaClientSaslMechanisms = List("PLAIN", "GSSAPI")
+  this.serverConfig.setProperty(KafkaConfig.ZkEnableSecureAclsProp, "true")
+  override protected def securityProtocol = SecurityProtocol.SASL_SSL
+  override protected lazy val trustStoreFile = 
Some(File.createTempFile("truststore", ".jks"))
+  override protected val saslProperties = 
Some(kafkaSaslProperties(kafkaClientSaslMechanism, kafkaServerSaslMechanisms))
+
+  @Test
+  def testMultipleBrokerMechanisms() {
+
+    val plainSaslProducer = producers(0)
+    val plainSaslConsumer = consumers(0)
+
+    val gssapiSaslProperties = kafkaSaslProperties("GSSAPI", 
kafkaServerSaslMechanisms)
+    val gssapiSaslProducer = TestUtils.createNewProducer(brokerList,
+                                                         securityProtocol = 
this.securityProtocol,
+                                                         trustStoreFile = 
this.trustStoreFile,
+                                                         saslProperties = 
Some(gssapiSaslProperties))
+    producers += gssapiSaslProducer
+    val gssapiSaslConsumer = TestUtils.createNewConsumer(brokerList,
+                                                         securityProtocol = 
this.securityProtocol,
+                                                         trustStoreFile = 
this.trustStoreFile,
+                                                         saslProperties = 
Some(gssapiSaslProperties))
+    consumers += gssapiSaslConsumer
+    val numRecords = 1000
+    var startingOffset = 0
+
+    // Test SASL/PLAIN producer and consumer
+    sendRecords(plainSaslProducer, numRecords, tp)
+    plainSaslConsumer.assign(List(tp).asJava)
+    plainSaslConsumer.seek(tp, 0)
+    consumeAndVerifyRecords(consumer = plainSaslConsumer, numRecords = 
numRecords, startingOffset = startingOffset)
+    val plainCommitCallback = new CountConsumerCommitCallback()
+    plainSaslConsumer.commitAsync(plainCommitCallback)
+    awaitCommitCallback(plainSaslConsumer, plainCommitCallback)
+    startingOffset += numRecords
+
+    // Test SASL/GSSAPI producer and consumer
+    sendRecords(gssapiSaslProducer, numRecords, tp)
+    gssapiSaslConsumer.assign(List(tp).asJava)
+    gssapiSaslConsumer.seek(tp, startingOffset)
+    consumeAndVerifyRecords(consumer = gssapiSaslConsumer, numRecords = 
numRecords, startingOffset = startingOffset)
+    val gssapiCommitCallback = new CountConsumerCommitCallback()
+    gssapiSaslConsumer.commitAsync(gssapiCommitCallback)
+    awaitCommitCallback(gssapiSaslConsumer, gssapiCommitCallback)
+    startingOffset += numRecords
+
+    // Test SASL/PLAIN producer and SASL/GSSAPI consumer
+    sendRecords(plainSaslProducer, numRecords, tp)
+    gssapiSaslConsumer.assign(List(tp).asJava)
+    gssapiSaslConsumer.seek(tp, startingOffset)
+    consumeAndVerifyRecords(consumer = gssapiSaslConsumer, numRecords = 
numRecords, startingOffset = startingOffset)
+    startingOffset += numRecords
+
+    // Test SASL/GSSAPI producer and SASL/PLAIN consumer
+    sendRecords(gssapiSaslProducer, numRecords, tp)
+    plainSaslConsumer.assign(List(tp).asJava)
+    plainSaslConsumer.seek(tp, startingOffset)
+    consumeAndVerifyRecords(consumer = plainSaslConsumer, numRecords = 
numRecords, startingOffset = startingOffset)
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/5b375d7b/core/src/test/scala/integration/kafka/api/SaslPlainPlaintextConsumerTest.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/integration/kafka/api/SaslPlainPlaintextConsumerTest.scala
 
b/core/src/test/scala/integration/kafka/api/SaslPlainPlaintextConsumerTest.scala
new file mode 100644
index 0000000..687cfc3
--- /dev/null
+++ 
b/core/src/test/scala/integration/kafka/api/SaslPlainPlaintextConsumerTest.scala
@@ -0,0 +1,27 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements. See the NOTICE
+  * file distributed with this work for additional information regarding 
copyright ownership. The ASF licenses this file
+  * to You under the Apache License, Version 2.0 (the "License"); you may not 
use this file except in compliance with the
+  * License. You may obtain a copy of the License at
+  *
+  * http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on
+  * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either 
express or implied. See the License for the
+  * specific language governing permissions and limitations under the License.
+  */
+package kafka.api
+
+import java.io.File
+import org.apache.kafka.common.protocol.SecurityProtocol
+import kafka.server.KafkaConfig
+
+class SaslPlainPlaintextConsumerTest extends BaseConsumerTest with 
SaslTestHarness {
+  override protected val zkSaslEnabled = true
+  override protected val kafkaClientSaslMechanism = "PLAIN"
+  override protected val kafkaServerSaslMechanisms = 
List(kafkaClientSaslMechanism)
+  this.serverConfig.setProperty(KafkaConfig.ZkEnableSecureAclsProp, "true")
+  override protected def securityProtocol = SecurityProtocol.SASL_PLAINTEXT
+  override protected lazy val trustStoreFile = 
Some(File.createTempFile("truststore", ".jks"))
+  override protected val saslProperties = 
Some(kafkaSaslProperties(kafkaClientSaslMechanism, kafkaServerSaslMechanisms))
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/5b375d7b/core/src/test/scala/integration/kafka/api/SaslSetup.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/SaslSetup.scala 
b/core/src/test/scala/integration/kafka/api/SaslSetup.scala
index 967cae1..acc86e3 100644
--- a/core/src/test/scala/integration/kafka/api/SaslSetup.scala
+++ b/core/src/test/scala/integration/kafka/api/SaslSetup.scala
@@ -19,11 +19,10 @@ package kafka.api
 
 import java.io.File
 import javax.security.auth.login.Configuration
-
 import kafka.security.minikdc.MiniKdc
 import kafka.utils.{JaasTestUtils, TestUtils}
 import org.apache.kafka.common.security.JaasUtils
-import org.apache.kafka.common.security.kerberos.LoginManager
+import org.apache.kafka.common.security.authenticator.LoginManager
 
 /*
  * Implements an enumeration for the modes enabled here:
@@ -40,35 +39,42 @@ case object Both extends SaslSetupMode
 trait SaslSetup {
   private val workDir = TestUtils.tempDir()
   private val kdcConf = MiniKdc.createConfig
-  private val kdc = new MiniKdc(kdcConf, workDir)
+  private var kdc: MiniKdc = null
 
-  def startSasl(mode: SaslSetupMode = Both) {
+  def startSasl(mode: SaslSetupMode = Both, kafkaServerSaslMechanisms: 
List[String], kafkaClientSaslMechanisms: List[String]) {
     // Important if tests leak consumers, producers or brokers
     LoginManager.closeAll()
-    val (serverKeytabFile, clientKeytabFile) = 
createKeytabsAndSetConfiguration(mode)
-    kdc.start()
-    kdc.createPrincipal(serverKeytabFile, "kafka/localhost")
-    kdc.createPrincipal(clientKeytabFile, "client")
+    val hasKerberos = mode != ZkSasl && 
(kafkaClientSaslMechanisms.contains("GSSAPI") || 
kafkaServerSaslMechanisms.contains("GSSAPI"))
+    if (hasKerberos) {
+      val serverKeytabFile = TestUtils.tempFile()
+      val clientKeytabFile = TestUtils.tempFile()
+      setJaasConfiguration(mode, kafkaServerSaslMechanisms, 
kafkaClientSaslMechanisms, Some(serverKeytabFile), Some(clientKeytabFile))
+      kdc = new MiniKdc(kdcConf, workDir)
+      kdc.start()
+      kdc.createPrincipal(serverKeytabFile, "kafka/localhost")
+      kdc.createPrincipal(clientKeytabFile, "client")
+    } else {
+      setJaasConfiguration(mode, kafkaServerSaslMechanisms, 
kafkaClientSaslMechanisms)
+    }
     if (mode == Both || mode == ZkSasl)
       System.setProperty("zookeeper.authProvider.1", 
"org.apache.zookeeper.server.auth.SASLAuthenticationProvider")
   }
 
-  protected def createKeytabsAndSetConfiguration(mode: SaslSetupMode): (File, 
File) = {
-    val serverKeytabFile = TestUtils.tempFile()
-    val clientKeytabFile = TestUtils.tempFile()
+  protected def setJaasConfiguration(mode: SaslSetupMode, 
kafkaServerSaslMechanisms: List[String], kafkaClientSaslMechanisms: 
List[String], 
+      serverKeytabFile: Option[File] = None, clientKeytabFile: Option[File] = 
None) {
     val jaasFile = mode match {
       case ZkSasl => JaasTestUtils.writeZkFile()
-      case KafkaSasl => JaasTestUtils.writeKafkaFile(serverKeytabFile, 
clientKeytabFile)
-      case Both => JaasTestUtils.writeZkAndKafkaFiles(serverKeytabFile, 
clientKeytabFile)
+      case KafkaSasl => 
JaasTestUtils.writeKafkaFile(kafkaServerSaslMechanisms, 
kafkaClientSaslMechanisms, serverKeytabFile, clientKeytabFile)
+      case Both => 
JaasTestUtils.writeZkAndKafkaFiles(kafkaServerSaslMechanisms, 
kafkaClientSaslMechanisms, serverKeytabFile, clientKeytabFile)
     }
     // This will cause a reload of the Configuration singleton when 
`getConfiguration` is called
     Configuration.setConfiguration(null)
     System.setProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM, jaasFile)
-    (serverKeytabFile, clientKeytabFile)
   }
 
   def closeSasl() {
-    kdc.stop()
+    if (kdc != null)
+      kdc.stop()
     // Important if tests leak consumers, producers or brokers
     LoginManager.closeAll()
     System.clearProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM)

http://git-wip-us.apache.org/repos/asf/kafka/blob/5b375d7b/core/src/test/scala/integration/kafka/api/SaslTestHarness.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/SaslTestHarness.scala 
b/core/src/test/scala/integration/kafka/api/SaslTestHarness.scala
index b4ae74f..5531919 100644
--- a/core/src/test/scala/integration/kafka/api/SaslTestHarness.scala
+++ b/core/src/test/scala/integration/kafka/api/SaslTestHarness.scala
@@ -13,17 +13,27 @@
 package kafka.api
 
 import kafka.zk.ZooKeeperTestHarness
+import kafka.server.KafkaConfig
 import org.junit.{After, Before}
+import java.util.Properties
+import scala.collection.JavaConverters._
+import org.apache.kafka.common.config.SaslConfigs
 
 trait SaslTestHarness extends ZooKeeperTestHarness with SaslSetup {
   protected val zkSaslEnabled: Boolean
+  protected val kafkaClientSaslMechanism = "GSSAPI"
+  protected val kafkaServerSaslMechanisms = List(kafkaClientSaslMechanism)
+
+  // Override this list to enable client login modules for multiple mechanisms 
for testing
+  // of multi-mechanism brokers with clients using different mechanisms in a 
single JVM
+  protected def allKafkaClientSaslMechanisms = List(kafkaClientSaslMechanism)
 
   @Before
   override def setUp() {
     if (zkSaslEnabled)
-      startSasl(Both)
+      startSasl(Both, kafkaServerSaslMechanisms, allKafkaClientSaslMechanisms)
     else
-      startSasl(KafkaSasl)
+      startSasl(KafkaSasl, kafkaServerSaslMechanisms, 
allKafkaClientSaslMechanisms)
     super.setUp
   }
 
@@ -33,4 +43,12 @@ trait SaslTestHarness extends ZooKeeperTestHarness with 
SaslSetup {
     closeSasl()
   }
 
+  def kafkaSaslProperties(kafkaClientSaslMechanism: String, 
kafkaServerSaslMechanisms: List[String]) = {
+    val props = new Properties
+    props.put(SaslConfigs.SASL_MECHANISM, kafkaClientSaslMechanism)
+    props.put(KafkaConfig.SaslMechanismInterBrokerProtocolProp, 
kafkaClientSaslMechanism)
+    props.put(SaslConfigs.SASL_ENABLED_MECHANISMS, 
kafkaServerSaslMechanisms.asJava)
+    props
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/5b375d7b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala 
b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
index 2ca64f2..8e8ae8b 100755
--- a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
+++ b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
@@ -19,7 +19,6 @@ package kafka.integration
 
 import java.io.File
 import java.util.Arrays
-
 import kafka.common.KafkaException
 import kafka.server._
 import kafka.utils.{CoreUtils, TestUtils}
@@ -27,8 +26,8 @@ import kafka.zk.ZooKeeperTestHarness
 import org.apache.kafka.common.protocol.SecurityProtocol
 import org.apache.kafka.common.security.auth.KafkaPrincipal
 import org.junit.{After, Before}
-
 import scala.collection.mutable.Buffer
+import java.util.Properties
 
 /**
  * A test harness that brings up some number of broker nodes
@@ -57,6 +56,7 @@ trait KafkaServerTestHarness extends ZooKeeperTestHarness {
 
   protected def securityProtocol: SecurityProtocol = SecurityProtocol.PLAINTEXT
   protected def trustStoreFile: Option[File] = None
+  protected def saslProperties: Option[Properties] = None
 
   @Before
   override def setUp() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/5b375d7b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala 
b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
index fa240d2..f8476cd 100755
--- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
@@ -550,6 +550,8 @@ class KafkaConfigTest {
         case KafkaConfig.SslCipherSuitesProp => // ignore string
 
         //Sasl Configs
+        case KafkaConfig.SaslMechanismInterBrokerProtocolProp => // ignore
+        case KafkaConfig.SaslEnabledMechanismsProp =>
         case KafkaConfig.SaslKerberosServiceNameProp => // ignore string
         case KafkaConfig.SaslKerberosKinitCmdProp =>
         case KafkaConfig.SaslKerberosTicketRenewWindowFactorProp =>

http://git-wip-us.apache.org/repos/asf/kafka/blob/5b375d7b/core/src/test/scala/unit/kafka/utils/JaasTestUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/JaasTestUtils.scala 
b/core/src/test/scala/unit/kafka/utils/JaasTestUtils.scala
index a14cd3f..7c4b951 100644
--- a/core/src/test/scala/unit/kafka/utils/JaasTestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/JaasTestUtils.scala
@@ -20,16 +20,14 @@ import java.io.{File, BufferedWriter, FileWriter}
 
 object JaasTestUtils {
 
-  case class Krb5LoginModule(contextName: String,
-                             useKeyTab: Boolean,
+  case class Krb5LoginModule(useKeyTab: Boolean,
                              storeKey: Boolean,
                              keyTab: String,
                              principal: String,
                              debug: Boolean,
                              serviceName: Option[String]) {
-    def toJaasSection: JaasSection = {
-      JaasSection(
-        contextName,
+    def toJaasModule: JaasModule = {
+      JaasModule(
         "com.sun.security.auth.module.Krb5LoginModule",
         debug = debug,
         entries = Map(
@@ -42,15 +40,38 @@ object JaasTestUtils {
     }
   }
 
-  case class JaasSection(contextName: String,
-                         moduleName: String,
-                         debug: Boolean,
-                         entries: Map[String, String]) {
+  case class PlainLoginModule(username: String,
+                              password: String,
+                              debug: Boolean = false,
+                              validUsers: Map[String, String] = Map.empty) {
+    def toJaasModule: JaasModule = {
+      JaasModule(
+        "org.apache.kafka.common.security.plain.PlainLoginModule",
+        debug = debug,
+        entries = Map(
+          "username" -> username,
+          "password" -> password
+        ) ++ validUsers.map { case (user, pass) => (s"user_$user"-> pass)}
+      )
+    }
+  }
+
+  case class JaasModule(moduleName: String,
+                        debug: Boolean,
+                        entries: Map[String, String]) {
     override def toString: String = {
-      s"""|$contextName {
-          |  $moduleName required
+      s"""$moduleName required
           |  debug=$debug
           |  ${entries.map { case (k, v) => s"""$k="$v"""" }.mkString("", "\n| 
 ", ";")}
+          |"""
+    }
+  }
+
+  class JaasSection(contextName: String,
+                    jaasModule: Seq[JaasModule]) {
+    override def toString: String = {
+      s"""|$contextName {
+          |  ${jaasModule.mkString("\n  ")}
           |};
           |""".stripMargin
     }
@@ -67,6 +88,11 @@ object JaasTestUtils {
   private val KafkaServerPrincipal = "kafka/[email protected]"
   private val KafkaClientContextName = "KafkaClient"
   private val KafkaClientPrincipal = "[email protected]"
+  
+  private val KafkaPlainUser = "testuser"
+  private val KafkaPlainPassword = "testuser-secret"
+  private val KafkaPlainAdmin = "admin"
+  private val KafkaPlainAdminPassword = "admin-secret"
 
   def writeZkFile(): String = {
     val jaasFile = TestUtils.tempFile()
@@ -74,43 +100,65 @@ object JaasTestUtils {
     jaasFile.getCanonicalPath
   }
 
-  def writeKafkaFile(serverKeyTabLocation: File, clientKeyTabLocation: File): 
String = {
+  def writeKafkaFile(kafkaServerSaslMechanisms: List[String], 
kafkaClientSaslMechanisms: List[String], serverKeyTabLocation: Option[File], 
clientKeyTabLocation: Option[File]): String = {
     val jaasFile = TestUtils.tempFile()
-    writeToFile(jaasFile, kafkaSections(serverKeyTabLocation, 
clientKeyTabLocation))
+    val kafkaSections = Seq(kafkaServerSection(kafkaServerSaslMechanisms, 
serverKeyTabLocation), kafkaClientSection(kafkaClientSaslMechanisms, 
clientKeyTabLocation))
+    writeToFile(jaasFile, kafkaSections)
     jaasFile.getCanonicalPath
   }
 
-  def writeZkAndKafkaFiles(serverKeyTabLocation: File, clientKeyTabLocation: 
File): String = {
+  def writeZkAndKafkaFiles(kafkaServerSaslMechanisms: List[String], 
kafkaClientSaslMechanisms: List[String], serverKeyTabLocation: Option[File], 
clientKeyTabLocation: Option[File]): String = {
     val jaasFile = TestUtils.tempFile()
-    writeToFile(jaasFile, kafkaSections(serverKeyTabLocation, 
clientKeyTabLocation) ++ zkSections)
+    val kafkaSections = Seq(kafkaServerSection(kafkaServerSaslMechanisms, 
serverKeyTabLocation), kafkaClientSection(kafkaClientSaslMechanisms, 
clientKeyTabLocation))
+    writeToFile(jaasFile, kafkaSections ++ zkSections)
     jaasFile.getCanonicalPath
   }
 
   private def zkSections: Seq[JaasSection] = Seq(
-    JaasSection(ZkServerContextName, ZkModule, false, Map("user_super" -> 
ZkUserSuperPasswd, s"user_$ZkUser" -> ZkUserPassword)),
-    JaasSection(ZkClientContextName, ZkModule, false, Map("username" -> 
ZkUser, "password" -> ZkUserPassword))
+    new JaasSection(ZkServerContextName, Seq(JaasModule(ZkModule, false, 
Map("user_super" -> ZkUserSuperPasswd, s"user_$ZkUser" -> ZkUserPassword)))),
+    new JaasSection(ZkClientContextName, Seq(JaasModule(ZkModule, false, 
Map("username" -> ZkUser, "password" -> ZkUserPassword))))
   )
 
-  private def kafkaSections(serverKeytabLocation: File, clientKeytabLocation: 
File): Seq[JaasSection] = {
-    Seq(
-      Krb5LoginModule(
-        KafkaServerContextName,
-        useKeyTab = true,
-        storeKey = true,
-        keyTab = serverKeytabLocation.getAbsolutePath,
-        principal = KafkaServerPrincipal,
-        debug = true,
-        serviceName = Some("kafka")),
-      Krb5LoginModule(
-        KafkaClientContextName,
-        useKeyTab = true,
-        storeKey = true,
-        keyTab = clientKeytabLocation.getAbsolutePath,
-        principal = KafkaClientPrincipal,
-        debug = true,
-        serviceName = Some("kafka")
-      )
-    ).map(_.toJaasSection)
+  private def kafkaServerSection(mechanisms: List[String], keytabLocation: 
Option[File]): JaasSection = {
+    val modules = mechanisms.map {
+      case "GSSAPI" =>
+        Krb5LoginModule(
+          useKeyTab = true,
+          storeKey = true,
+          keyTab = keytabLocation.getOrElse(throw new 
IllegalArgumentException("Keytab location not specified for 
GSSAPI")).getAbsolutePath,
+          principal = KafkaServerPrincipal,
+          debug = true,
+          serviceName = Some("kafka")).toJaasModule
+      case "PLAIN" =>
+        PlainLoginModule(
+          KafkaPlainAdmin,
+          KafkaPlainAdminPassword,
+          debug = false,
+          Map(KafkaPlainAdmin -> KafkaPlainAdminPassword, KafkaPlainUser -> 
KafkaPlainPassword)).toJaasModule
+      case mechanism => throw new IllegalArgumentException("Unsupported server 
mechanism " + mechanism)
+    }
+    new JaasSection(KafkaServerContextName, modules)
+  }
+
+  private def kafkaClientSection(mechanisms: List[String], keytabLocation: 
Option[File]): JaasSection = {
+    val modules = mechanisms.map {
+      case "GSSAPI" =>
+        Krb5LoginModule(
+          useKeyTab = true,
+          storeKey = true,
+          keyTab = keytabLocation.getOrElse(throw new 
IllegalArgumentException("Keytab location not specified for 
GSSAPI")).getAbsolutePath,
+          principal = KafkaClientPrincipal,
+          debug = true,
+          serviceName = Some("kafka")
+        ).toJaasModule
+      case "PLAIN" =>
+        PlainLoginModule(
+          KafkaPlainUser,
+          KafkaPlainPassword
+        ).toJaasModule
+      case mechanism => throw new IllegalArgumentException("Unsupported client 
mechanism " + mechanism)
+    }
+    new JaasSection(KafkaClientContextName, modules)
   }
 
   private def jaasSectionsToString(jaasSections: Seq[JaasSection]): String =

http://git-wip-us.apache.org/repos/asf/kafka/blob/5b375d7b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala 
b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 6bd6c63..7df87fc 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -26,12 +26,10 @@ import java.util.{Collections, Properties, Random}
 import java.security.cert.X509Certificate
 import javax.net.ssl.X509TrustManager
 import charset.Charset
-
 import kafka.security.auth.{Acl, Authorizer, Resource}
 import org.apache.kafka.common.protocol.SecurityProtocol
 import org.apache.kafka.common.utils.Utils._
 import org.apache.kafka.test.TestSslUtils
-
 import scala.collection.mutable.{ArrayBuffer, ListBuffer}
 import kafka.server._
 import kafka.producer._
@@ -146,6 +144,7 @@ object TestUtils extends Logging {
     enableDeleteTopic: Boolean = false,
     interBrokerSecurityProtocol: Option[SecurityProtocol] = None,
     trustStoreFile: Option[File] = None,
+    saslProperties: Option[Properties] = None,
     enablePlaintext: Boolean = true,
     enableSsl: Boolean = false,
     enableSaslPlaintext: Boolean = false,
@@ -153,7 +152,7 @@ object TestUtils extends Logging {
     rackInfo: Map[Int, String] = Map()): Seq[Properties] = {
     (0 until numConfigs).map { node =>
       createBrokerConfig(node, zkConnect, enableControlledShutdown, 
enableDeleteTopic, RandomPort,
-        interBrokerSecurityProtocol, trustStoreFile, enablePlaintext = 
enablePlaintext, enableSsl = enableSsl,
+        interBrokerSecurityProtocol, trustStoreFile, saslProperties, 
enablePlaintext = enablePlaintext, enableSsl = enableSsl,
         enableSaslPlaintext = enableSaslPlaintext, enableSaslSsl = 
enableSaslSsl, rack = rackInfo.get(node))
     }
   }
@@ -173,6 +172,7 @@ object TestUtils extends Logging {
     port: Int = RandomPort,
     interBrokerSecurityProtocol: Option[SecurityProtocol] = None,
     trustStoreFile: Option[File] = None,
+    saslProperties: Option[Properties] = None,
     enablePlaintext: Boolean = true,
     enableSaslPlaintext: Boolean = false, saslPlaintextPort: Int = RandomPort,
     enableSsl: Boolean = false, sslPort: Int = RandomPort,
@@ -211,6 +211,9 @@ object TestUtils extends Logging {
     if (protocolAndPorts.exists { case (protocol, _) => 
usesSslTransportLayer(protocol) })
       props.putAll(sslConfigs(Mode.SERVER, false, trustStoreFile, 
s"server$nodeId"))
 
+    if (protocolAndPorts.exists { case (protocol, _) => 
usesSaslTransportLayer(protocol) })
+      props.putAll(saslConfigs(saslProperties))
+
     interBrokerSecurityProtocol.foreach { protocol =>
       props.put(KafkaConfig.InterBrokerSecurityProtocolProp, protocol.name)
     }
@@ -440,16 +443,19 @@ object TestUtils extends Logging {
   private def securityConfigs(mode: Mode,
                               securityProtocol: SecurityProtocol,
                               trustStoreFile: Option[File],
-                              certAlias: String): Properties = {
+                              certAlias: String,
+                              saslProperties: Option[Properties]): Properties 
= {
     val props = new Properties
     if (usesSslTransportLayer(securityProtocol))
       props.putAll(sslConfigs(mode, securityProtocol == SecurityProtocol.SSL, 
trustStoreFile, certAlias))
+    if (usesSaslTransportLayer(securityProtocol))
+      props.putAll(saslConfigs(saslProperties))
     props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, 
securityProtocol.name)
     props
   }
 
-  def producerSecurityConfigs(securityProtocol: SecurityProtocol, 
trustStoreFile: Option[File]): Properties =
-    securityConfigs(Mode.CLIENT, securityProtocol, trustStoreFile, "producer")
+  def producerSecurityConfigs(securityProtocol: SecurityProtocol, 
trustStoreFile: Option[File], saslProperties: Option[Properties]): Properties =
+    securityConfigs(Mode.CLIENT, securityProtocol, trustStoreFile, "producer", 
saslProperties)
 
   /**
    * Create a (new) producer with a few pre-configured properties.
@@ -463,6 +469,7 @@ object TestUtils extends Logging {
                         requestTimeoutMs: Long = 10 * 1024L,
                         securityProtocol: SecurityProtocol = 
SecurityProtocol.PLAINTEXT,
                         trustStoreFile: Option[File] = None,
+                        saslProperties: Option[Properties] = None,
                         keySerializer: Serializer[K] = new ByteArraySerializer,
                         valueSerializer: Serializer[V] = new 
ByteArraySerializer,
                         props: Option[Properties] = None): KafkaProducer[K, V] 
= {
@@ -493,7 +500,7 @@ object TestUtils extends Logging {
      * SSL client auth fails.
      */
     if 
(!producerProps.containsKey(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG))
-      producerProps.putAll(producerSecurityConfigs(securityProtocol, 
trustStoreFile))
+      producerProps.putAll(producerSecurityConfigs(securityProtocol, 
trustStoreFile, saslProperties))
 
     new KafkaProducer[K, V](producerProps, keySerializer, valueSerializer)
   }
@@ -503,8 +510,13 @@ object TestUtils extends Logging {
     case _ => false
   }
 
-  def consumerSecurityConfigs(securityProtocol: SecurityProtocol, 
trustStoreFile: Option[File]): Properties =
-    securityConfigs(Mode.CLIENT, securityProtocol, trustStoreFile, "consumer")
+  private def usesSaslTransportLayer(securityProtocol: SecurityProtocol): 
Boolean = securityProtocol match {
+    case SecurityProtocol.SASL_PLAINTEXT | SecurityProtocol.SASL_SSL => true
+    case _ => false
+  }
+
+  def consumerSecurityConfigs(securityProtocol: SecurityProtocol, 
trustStoreFile: Option[File], saslProperties: Option[Properties]): Properties =
+    securityConfigs(Mode.CLIENT, securityProtocol, trustStoreFile, "consumer", 
saslProperties)
 
   /**
    * Create a new consumer with a few pre-configured properties.
@@ -517,6 +529,7 @@ object TestUtils extends Logging {
                         sessionTimeout: Int = 30000,
                         securityProtocol: SecurityProtocol,
                         trustStoreFile: Option[File] = None,
+                        saslProperties: Option[Properties] = None,
                         props: Option[Properties] = None) : 
KafkaConsumer[Array[Byte],Array[Byte]] = {
     import org.apache.kafka.clients.consumer.ConsumerConfig
 
@@ -545,7 +558,7 @@ object TestUtils extends Logging {
      * SSL client auth fails.
      */
     
if(!consumerProps.containsKey(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG))
-      consumerProps.putAll(consumerSecurityConfigs(securityProtocol, 
trustStoreFile))
+      consumerProps.putAll(consumerSecurityConfigs(securityProtocol, 
trustStoreFile, saslProperties))
 
     new KafkaConsumer[Array[Byte],Array[Byte]](consumerProps)
   }
@@ -1058,6 +1071,13 @@ object TestUtils extends Logging {
     sslProps
   }
 
+  def saslConfigs(saslProperties: Option[Properties]): Properties = {
+    saslProperties match {
+      case Some(properties) => properties
+      case None => new Properties
+    }
+  }
+
   // a X509TrustManager to trust self-signed certs for unit tests.
   def trustAllCerts: X509TrustManager = {
     val trustManager = new X509TrustManager() {

Reply via email to