http://git-wip-us.apache.org/repos/asf/kafka/blob/53f31432/core/src/main/scala/kafka/server/KafkaConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala 
b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 6217302..cf1a5a6 100644
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -19,12 +19,14 @@ package kafka.server
 
 import java.util.Properties
 
+import kafka.api.ApiVersion
+import kafka.cluster.EndPoint
 import kafka.consumer.ConsumerConfig
 import kafka.message.{BrokerCompressionCodec, CompressionCodec, Message, 
MessageSet}
 import kafka.utils.Utils
 import org.apache.kafka.common.config.ConfigDef
-
-import scala.collection.{JavaConversions, Map}
+import org.apache.kafka.common.protocol.SecurityProtocol
+import scala.collection.{immutable, JavaConversions, Map}
 
 object Defaults {
   /** ********* Zookeeper Configuration ***********/
@@ -101,6 +103,8 @@ object Defaults {
   val LeaderImbalancePerBrokerPercentage = 10
   val LeaderImbalanceCheckIntervalSeconds = 300
   val UncleanLeaderElectionEnable = true
+  val InterBrokerSecurityProtocol = SecurityProtocol.PLAINTEXT.toString
+  val InterBrokerProtocolVersion = ApiVersion.latestVersion.toString
 
   /** ********* Controlled shutdown configuration ***********/
   val ControlledShutdownMaxRetries = 3
@@ -142,8 +146,10 @@ object KafkaConfig {
   /** ********* Socket Server Configuration ***********/
   val PortProp = "port"
   val HostNameProp = "host.name"
+  val ListenersProp = "listeners"
   val AdvertisedHostNameProp: String = "advertised.host.name"
   val AdvertisedPortProp = "advertised.port"
+  val AdvertisedListenersProp = "advertised.listeners"
   val SocketSendBufferBytesProp = "socket.send.buffer.bytes"
   val SocketReceiveBufferBytesProp = "socket.receive.buffer.bytes"
   val SocketRequestMaxBytesProp = "socket.request.max.bytes"
@@ -207,6 +213,8 @@ object KafkaConfig {
   val LeaderImbalancePerBrokerPercentageProp = 
"leader.imbalance.per.broker.percentage"
   val LeaderImbalanceCheckIntervalSecondsProp = 
"leader.imbalance.check.interval.seconds"
   val UncleanLeaderElectionEnableProp = "unclean.leader.election.enable"
+  val InterBrokerSecurityProtocolProp = "security.inter.broker.protocol"
+  val InterBrokerProtocolVersionProp = "inter.broker.protocol.version"
   /** ********* Controlled shutdown configuration ***********/
   val ControlledShutdownMaxRetriesProp = "controlled.shutdown.max.retries"
   val ControlledShutdownRetryBackoffMsProp = 
"controlled.shutdown.retry.backoff.ms"
@@ -246,6 +254,12 @@ object KafkaConfig {
   /** ********* Socket Server Configuration ***********/
   val PortDoc = "the port to listen and accept connections on"
   val HostNameDoc = "hostname of broker. If this is set, it will only bind to 
this address. If this is not set, it will bind to all interfaces"
+  val ListenersDoc = "Listener List - Comma-separated list of URIs we will 
listen on and their protocols.\n" +
+          " Specify hostname as 0.0.0.0 to bind to all interfaces.\n" +
+          " Leave hostname empty to bind to default interface.\n" +
+          " Examples of legal listener lists:\n" +
+          " PLAINTEXT://myhost:9092,TRACE://:9091\n" +
+          " PLAINTEXT://0.0.0.0:9092, TRACE://localhost:9093\n"
   val AdvertisedHostNameDoc = "Hostname to publish to ZooKeeper for clients to 
use. In IaaS environments, this may " +
     "need to be different from the interface to which the broker binds. If 
this is not set, " +
     "it will use the value for \"host.name\" if configured. Otherwise " +
@@ -253,6 +267,9 @@ object KafkaConfig {
   val AdvertisedPortDoc = "The port to publish to ZooKeeper for clients to 
use. In IaaS environments, this may " +
     "need to be different from the port to which the broker binds. If this is 
not set, " +
     "it will publish the same port that the broker binds to."
+  val AdvertisedListenersDoc = "Listeners to publish to ZooKeeper for clients 
to use, if different than the listeners above." +
+          " In IaaS environments, this may need to be different from the 
interface to which the broker binds." +
+          " If this is not set, the value for \"listeners\" will be used."
   val SocketSendBufferBytesDoc = "The SO_SNDBUF buffer of the socket sever 
sockets"
   val SocketReceiveBufferBytesDoc = "The SO_RCVBUF buffer of the socket sever 
sockets"
   val SocketRequestMaxBytesDoc = "The maximum number of bytes in a socket 
request"
@@ -319,6 +336,10 @@ object KafkaConfig {
   val LeaderImbalancePerBrokerPercentageDoc = "The ratio of leader imbalance 
allowed per broker. The controller would trigger a leader balance if it goes 
above this value per broker. The value is specified in percentage."
   val LeaderImbalanceCheckIntervalSecondsDoc = "The frequency with which the 
partition rebalance check is triggered by the controller"
   val UncleanLeaderElectionEnableDoc = "Indicates whether to enable replicas 
not in the ISR set to be elected as leader as a last resort, even though doing 
so may result in data loss"
+  val InterBrokerSecurityProtocolDoc = "Security protocol used to communicate 
between brokers. Defaults to plain text."
+  val InterBrokerProtocolVersionDoc = "Specify which version of the 
inter-broker protocol will be used.\n" +
+          " This is typically bumped after all brokers were upgraded to a new 
version.\n" +
+          " Example of some valid values are: 0.8.0, 0.8.1, 0.8.1.1, 0.8.2, 
0.8.2.0, 0.8.2.1, 0.8.3, 0.8.3.0. Check ApiVersion for the full list."
   /** ********* Controlled shutdown configuration ***********/
   val ControlledShutdownMaxRetriesDoc = "Controlled shutdown can fail for 
multiple reasons. This determines the number of retries when such failure 
happens"
   val ControlledShutdownRetryBackoffMsDoc = "Before each retry, the system 
needs time to recover from the state that caused the previous failure 
(Controller fail over, replica lag etc). This config determines the amount of 
time to wait before retrying."
@@ -350,7 +371,6 @@ object KafkaConfig {
     import ConfigDef.ValidString._
     import ConfigDef.Type._
     import ConfigDef.Importance._
-    import java.util.Arrays.asList
 
     new ConfigDef()
 
@@ -372,8 +392,10 @@ object KafkaConfig {
       /** ********* Socket Server Configuration ***********/
       .define(PortProp, INT, Defaults.Port, HIGH, PortDoc)
       .define(HostNameProp, STRING, Defaults.HostName, HIGH, HostNameDoc)
+      .define(ListenersProp, STRING, HIGH, ListenersDoc, false)
       .define(AdvertisedHostNameProp, STRING, HIGH, AdvertisedHostNameDoc, 
false)
       .define(AdvertisedPortProp, INT, HIGH, AdvertisedPortDoc, false)
+      .define(AdvertisedListenersProp, STRING, HIGH, AdvertisedListenersDoc, 
false)
       .define(SocketSendBufferBytesProp, INT, Defaults.SocketSendBufferBytes, 
HIGH, SocketSendBufferBytesDoc)
       .define(SocketReceiveBufferBytesProp, INT, 
Defaults.SocketReceiveBufferBytes, HIGH, SocketReceiveBufferBytesDoc)
       .define(SocketRequestMaxBytesProp, INT, Defaults.SocketRequestMaxBytes, 
atLeast(1), HIGH, SocketRequestMaxBytesDoc)
@@ -439,7 +461,8 @@ object KafkaConfig {
       .define(LeaderImbalancePerBrokerPercentageProp, INT, 
Defaults.LeaderImbalancePerBrokerPercentage, HIGH, 
LeaderImbalancePerBrokerPercentageDoc)
       .define(LeaderImbalanceCheckIntervalSecondsProp, INT, 
Defaults.LeaderImbalanceCheckIntervalSeconds, HIGH, 
LeaderImbalanceCheckIntervalSecondsDoc)
       .define(UncleanLeaderElectionEnableProp, BOOLEAN, 
Defaults.UncleanLeaderElectionEnable, HIGH, UncleanLeaderElectionEnableDoc)
-
+      .define(InterBrokerSecurityProtocolProp, STRING, 
Defaults.InterBrokerSecurityProtocol, MEDIUM, InterBrokerSecurityProtocolDoc)
+      .define(InterBrokerProtocolVersionProp, STRING, 
Defaults.InterBrokerProtocolVersion, MEDIUM, InterBrokerProtocolVersionDoc)
       /** ********* Controlled shutdown configuration ***********/
       .define(ControlledShutdownMaxRetriesProp, INT, 
Defaults.ControlledShutdownMaxRetries, MEDIUM, ControlledShutdownMaxRetriesDoc)
       .define(ControlledShutdownRetryBackoffMsProp, INT, 
Defaults.ControlledShutdownRetryBackoffMs, MEDIUM, 
ControlledShutdownRetryBackoffMsDoc)
@@ -490,8 +513,10 @@ object KafkaConfig {
       /** ********* Socket Server Configuration ***********/
       port = parsed.get(PortProp).asInstanceOf[Int],
       hostName = parsed.get(HostNameProp).asInstanceOf[String],
+      _listeners = 
Option(parsed.get(ListenersProp)).map(_.asInstanceOf[String]),
       _advertisedHostName = 
Option(parsed.get(AdvertisedHostNameProp)).map(_.asInstanceOf[String]),
       _advertisedPort = 
Option(parsed.get(AdvertisedPortProp)).map(_.asInstanceOf[Int]),
+      _advertisedListeners = 
Option(parsed.get(AdvertisedListenersProp)).map(_.asInstanceOf[String]),
       socketSendBufferBytes = 
parsed.get(SocketSendBufferBytesProp).asInstanceOf[Int],
       socketReceiveBufferBytes = 
parsed.get(SocketReceiveBufferBytesProp).asInstanceOf[Int],
       socketRequestMaxBytes = 
parsed.get(SocketRequestMaxBytesProp).asInstanceOf[Int],
@@ -557,7 +582,8 @@ object KafkaConfig {
       leaderImbalancePerBrokerPercentage = 
parsed.get(LeaderImbalancePerBrokerPercentageProp).asInstanceOf[Int],
       leaderImbalanceCheckIntervalSeconds = 
parsed.get(LeaderImbalanceCheckIntervalSecondsProp).asInstanceOf[Int],
       uncleanLeaderElectionEnable = 
parsed.get(UncleanLeaderElectionEnableProp).asInstanceOf[Boolean],
-
+      interBrokerSecurityProtocol = 
SecurityProtocol.valueOf(parsed.get(InterBrokerSecurityProtocolProp).asInstanceOf[String]),
+      interBrokerProtocolVersion =  
ApiVersion(parsed.get(InterBrokerProtocolVersionProp).asInstanceOf[String]),
       /** ********* Controlled shutdown configuration ***********/
       controlledShutdownMaxRetries = 
parsed.get(ControlledShutdownMaxRetriesProp).asInstanceOf[Int],
       controlledShutdownRetryBackoffMs = 
parsed.get(ControlledShutdownRetryBackoffMsProp).asInstanceOf[Int],
@@ -628,8 +654,10 @@ class KafkaConfig(/** ********* Zookeeper Configuration 
***********/
                   /** ********* Socket Server Configuration ***********/
                   val port: Int = Defaults.Port,
                   val hostName: String = Defaults.HostName,
+                  private val _listeners: Option[String] = None,
                   private val _advertisedHostName: Option[String] = None,
                   private val _advertisedPort: Option[Int] = None,
+                  private val _advertisedListeners: Option[String] = None,
                   val socketSendBufferBytes: Int = 
Defaults.SocketSendBufferBytes,
                   val socketReceiveBufferBytes: Int = 
Defaults.SocketReceiveBufferBytes,
                   val socketRequestMaxBytes: Int = 
Defaults.SocketRequestMaxBytes,
@@ -697,6 +725,8 @@ class KafkaConfig(/** ********* Zookeeper Configuration 
***********/
                   val leaderImbalancePerBrokerPercentage: Int = 
Defaults.LeaderImbalancePerBrokerPercentage,
                   val leaderImbalanceCheckIntervalSeconds: Int = 
Defaults.LeaderImbalanceCheckIntervalSeconds,
                   val uncleanLeaderElectionEnable: Boolean = 
Defaults.UncleanLeaderElectionEnable,
+                  val interBrokerSecurityProtocol: SecurityProtocol = 
SecurityProtocol.valueOf(Defaults.InterBrokerSecurityProtocol),
+                  val interBrokerProtocolVersion: ApiVersion = 
ApiVersion(Defaults.InterBrokerProtocolVersion),
 
                   /** ********* Controlled shutdown configuration ***********/
                   val controlledShutdownMaxRetries: Int = 
Defaults.ControlledShutdownMaxRetries,
@@ -721,8 +751,10 @@ class KafkaConfig(/** ********* Zookeeper Configuration 
***********/
 
   val zkConnectionTimeoutMs: Int = 
_zkConnectionTimeoutMs.getOrElse(zkSessionTimeoutMs)
 
+  val listeners = getListeners()
   val advertisedHostName: String = _advertisedHostName.getOrElse(hostName)
   val advertisedPort: Int = _advertisedPort.getOrElse(port)
+  val advertisedListeners = getAdvertisedListeners()
   val logDirs = Utils.parseCsvList(_logDirs.getOrElse(_logDir))
 
   val logRollTimeMillis = _logRollTimeMillis.getOrElse(60 * 60 * 1000L * 
logRollTimeHours)
@@ -731,14 +763,6 @@ class KafkaConfig(/** ********* Zookeeper Configuration 
***********/
 
   val logFlushIntervalMs = 
_logFlushIntervalMs.getOrElse(logFlushSchedulerIntervalMs)
 
-  private def getMap(propName: String, propValue: String): Map[String, String] 
= {
-    try {
-      Utils.parseCsvMap(propValue)
-    } catch {
-      case e: Exception => throw new IllegalArgumentException("Error parsing 
configuration property '%s': %s".format(propName, e.getMessage))
-    }
-  }
-
   val maxConnectionsPerIpOverrides: Map[String, Int] =
     getMap(KafkaConfig.MaxConnectionsPerIpOverridesProp, 
_maxConnectionsPerIpOverrides).map { case (k, v) => (k, v.toInt)}
 
@@ -754,6 +778,56 @@ class KafkaConfig(/** ********* Zookeeper Configuration 
***********/
     )
   }
 
+  private def getMap(propName: String, propValue: String): Map[String, String] 
= {
+    try {
+      Utils.parseCsvMap(propValue)
+    } catch {
+      case e: Exception => throw new IllegalArgumentException("Error parsing 
configuration property '%s': %s".format(propName, e.getMessage))
+    }
+  }
+
+  private def validateUniquePortAndProtocol(listeners: String) {
+
+    val endpoints = try {
+      val listenerList = Utils.parseCsvList(listeners)
+      listenerList.map(listener => EndPoint.createEndPoint(listener))
+    } catch {
+      case e: Exception => throw new IllegalArgumentException("Error creating 
broker listeners from '%s': %s".format(listeners, e.getMessage))
+    }
+    val distinctPorts = endpoints.map(ep => ep.port).distinct
+    val distinctProtocols = endpoints.map(ep => ep.protocolType).distinct
+
+    require(distinctPorts.size == endpoints.size, "Each listener must have a 
different port")
+    require(distinctProtocols.size == endpoints.size, "Each listener must have 
a different protocol")
+  }
+
+  // If the user did not define listeners but did define host or port, let's 
use them in backward compatible way
+  // If none of those are defined, we default to PLAINTEXT://:9092
+  private def getListeners(): immutable.Map[SecurityProtocol, EndPoint] = {
+    if (_listeners.isDefined) {
+      validateUniquePortAndProtocol(_listeners.get)
+      Utils.listenerListToEndPoints(_listeners.get)
+    } else {
+      Utils.listenerListToEndPoints("PLAINTEXT://" + hostName + ":" + port)
+    }
+  }
+
+  // If the user defined advertised listeners, we use those
+  // If he didn't but did define advertised host or port, we'll use those and 
fill in the missing value from regular host / port or defaults
+  // If none of these are defined, we'll use the listeners
+  private def getAdvertisedListeners(): immutable.Map[SecurityProtocol, 
EndPoint] = {
+    if (_advertisedListeners.isDefined) {
+      validateUniquePortAndProtocol(_advertisedListeners.get)
+      Utils.listenerListToEndPoints(_advertisedListeners.get)
+    } else if (_advertisedHostName.isDefined || _advertisedPort.isDefined ) {
+      Utils.listenerListToEndPoints("PLAINTEXT://" + advertisedHostName  + ":" 
+ advertisedPort)
+    } else {
+      getListeners()
+    }
+  }
+
+
+
   validateValues()
 
   private def validateValues() {
@@ -797,8 +871,10 @@ class KafkaConfig(/** ********* Zookeeper Configuration 
***********/
     /** ********* Socket Server Configuration ***********/
     props.put(PortProp, port.toString)
     props.put(HostNameProp, hostName)
+    _listeners.foreach(props.put(ListenersProp, _))
     _advertisedHostName.foreach(props.put(AdvertisedHostNameProp, _))
     _advertisedPort.foreach(value => props.put(AdvertisedPortProp, 
value.toString))
+    _advertisedListeners.foreach(props.put(AdvertisedListenersProp, _))
     props.put(SocketSendBufferBytesProp, socketSendBufferBytes.toString)
     props.put(SocketReceiveBufferBytesProp, socketReceiveBufferBytes.toString)
     props.put(SocketRequestMaxBytesProp, socketRequestMaxBytes.toString)
@@ -865,6 +941,9 @@ class KafkaConfig(/** ********* Zookeeper Configuration 
***********/
     props.put(LeaderImbalancePerBrokerPercentageProp, 
leaderImbalancePerBrokerPercentage.toString)
     props.put(LeaderImbalanceCheckIntervalSecondsProp, 
leaderImbalanceCheckIntervalSeconds.toString)
     props.put(UncleanLeaderElectionEnableProp, 
uncleanLeaderElectionEnable.toString)
+    props.put(InterBrokerSecurityProtocolProp, 
interBrokerSecurityProtocol.toString)
+    props.put(InterBrokerProtocolVersionProp, 
interBrokerProtocolVersion.toString)
+
 
     /** ********* Controlled shutdown configuration ***********/
     props.put(ControlledShutdownMaxRetriesProp, 
controlledShutdownMaxRetries.toString)

http://git-wip-us.apache.org/repos/asf/kafka/blob/53f31432/core/src/main/scala/kafka/server/KafkaHealthcheck.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaHealthcheck.scala 
b/core/src/main/scala/kafka/server/KafkaHealthcheck.scala
index 7907987..861b7f6 100644
--- a/core/src/main/scala/kafka/server/KafkaHealthcheck.scala
+++ b/core/src/main/scala/kafka/server/KafkaHealthcheck.scala
@@ -17,7 +17,9 @@
 
 package kafka.server
 
+import kafka.cluster.EndPoint
 import kafka.utils._
+import org.apache.kafka.common.protocol.SecurityProtocol
 import org.apache.zookeeper.Watcher.Event.KeeperState
 import org.I0Itec.zkclient.{IZkStateListener, ZkClient}
 import java.net.InetAddress
@@ -31,9 +33,8 @@ import java.net.InetAddress
  * Right now our definition of health is fairly naive. If we register in zk we 
are healthy, otherwise
  * we are dead.
  */
-class KafkaHealthcheck(private val brokerId: Int, 
-                       private val advertisedHost: String, 
-                       private val advertisedPort: Int,
+class KafkaHealthcheck(private val brokerId: Int,
+                       private val advertisedEndpoints: Map[SecurityProtocol, 
EndPoint],
                        private val zkSessionTimeoutMs: Int,
                        private val zkClient: ZkClient) extends Logging {
 
@@ -49,13 +50,19 @@ class KafkaHealthcheck(private val brokerId: Int,
    * Register this broker as "alive" in zookeeper
    */
   def register() {
-    val advertisedHostName = 
-      if(advertisedHost == null || advertisedHost.trim.isEmpty) 
-        InetAddress.getLocalHost.getCanonicalHostName 
-      else
-        advertisedHost
     val jmxPort = System.getProperty("com.sun.management.jmxremote.port", 
"-1").toInt
-    ZkUtils.registerBrokerInZk(zkClient, brokerId, advertisedHostName, 
advertisedPort, zkSessionTimeoutMs, jmxPort)
+    val updatedEndpoints = advertisedEndpoints.mapValues(endpoint =>
+      if (endpoint.host == null || endpoint.host.trim.isEmpty)
+        EndPoint(InetAddress.getLocalHost.getCanonicalHostName, endpoint.port, 
endpoint.protocolType)
+      else
+        endpoint
+    )
+
+    // the default host and port are here for compatibility with older client
+    // only PLAINTEXT is supported as default
+    // if the broker doesn't listen on PLAINTEXT protocol, an empty endpoint 
will be registered and older clients will be unable to connect
+    val plaintextEndpoint = 
updatedEndpoints.getOrElse(SecurityProtocol.PLAINTEXT, new 
EndPoint(null,-1,null))
+    ZkUtils.registerBrokerInZk(zkClient, brokerId, plaintextEndpoint.host, 
plaintextEndpoint.port, updatedEndpoints, zkSessionTimeoutMs, jmxPort)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/53f31432/core/src/main/scala/kafka/server/KafkaServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala 
b/core/src/main/scala/kafka/server/KafkaServer.scala
index 10ea77a..9df2cf4 100644
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -25,10 +25,11 @@ import kafka.utils._
 import java.util.concurrent._
 import atomic.{AtomicInteger, AtomicBoolean}
 import java.io.File
+
 import collection.mutable
 import org.I0Itec.zkclient.ZkClient
 import kafka.controller.{ControllerStats, KafkaController}
-import kafka.cluster.Broker
+import kafka.cluster.{EndPoint, Broker}
 import kafka.api.{ControlledShutdownResponse, ControlledShutdownRequest}
 import kafka.common.{ErrorMapping, InconsistentBrokerIdException, 
GenerateBrokerIdException}
 import kafka.network.{Receive, BlockingChannel, SocketServer}
@@ -117,57 +118,61 @@ class KafkaServer(val config: KafkaConfig, time: Time = 
SystemTime) extends Logg
         this.logIdent = "[Kafka Server " + config.brokerId + "], "
 
         socketServer = new SocketServer(config.brokerId,
-          config.hostName,
-          config.port,
-          config.numNetworkThreads,
-          config.queuedMaxRequests,
-          config.socketSendBufferBytes,
-          config.socketReceiveBufferBytes,
-          config.socketRequestMaxBytes,
-          config.maxConnectionsPerIp,
-          config.connectionsMaxIdleMs,
-          config.maxConnectionsPerIpOverrides)
-        socketServer.startup()
-
-        /* start replica manager */
-        replicaManager = new ReplicaManager(config, time, zkClient, 
kafkaScheduler, logManager, isShuttingDown)
-        replicaManager.startup()
-
-        /* start offset manager */
-        offsetManager = createOffsetManager()
-
-        /* start kafka controller */
-        kafkaController = new KafkaController(config, zkClient, brokerState)
-        kafkaController.startup()
-
-        /* start kafka coordinator */
-        consumerCoordinator = new ConsumerCoordinator(config, zkClient)
-        consumerCoordinator.startup()
-
-        /* start processing requests */
-        apis = new KafkaApis(socketServer.requestChannel, replicaManager, 
offsetManager, consumerCoordinator,
-          kafkaController, zkClient, config.brokerId, config, metadataCache)
-        requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, 
socketServer.requestChannel, apis, config.numIoThreads)
-        brokerState.newState(RunningAsBroker)
-
-        Mx4jLoader.maybeLoad()
-
-        /* start topic config manager */
-        topicConfigManager = new TopicConfigManager(zkClient, logManager)
-        topicConfigManager.startup()
-
-        /* tell everyone we are alive */
-        val advertisedPort = if (config.advertisedPort != 0) 
config.advertisedPort else socketServer.boundPort()
-        kafkaHealthcheck = new KafkaHealthcheck(config.brokerId, 
config.advertisedHostName, advertisedPort, config.zkSessionTimeoutMs, zkClient)
-        kafkaHealthcheck.startup()
-
-        /* register broker metrics */
-        registerStats()
-
-        shutdownLatch = new CountDownLatch(1)
-        startupComplete.set(true)
-        isStartingUp.set(false)
-        info("started")
+                                        config.listeners,
+                                        config.numNetworkThreads,
+                                        config.queuedMaxRequests,
+                                        config.socketSendBufferBytes,
+                                        config.socketReceiveBufferBytes,
+                                        config.socketRequestMaxBytes,
+                                        config.maxConnectionsPerIp,
+                                        config.connectionsMaxIdleMs,
+                                        config.maxConnectionsPerIpOverrides)
+          socketServer.startup()
+
+          /* start replica manager */
+          replicaManager = new ReplicaManager(config, time, zkClient, 
kafkaScheduler, logManager, isShuttingDown)
+          replicaManager.startup()
+
+          /* start offset manager */
+          offsetManager = createOffsetManager()
+
+          /* start kafka controller */
+          kafkaController = new KafkaController(config, zkClient, brokerState)
+          kafkaController.startup()
+
+          /* start kafka coordinator */
+          consumerCoordinator = new ConsumerCoordinator(config, zkClient)
+          consumerCoordinator.startup()
+
+          /* start processing requests */
+          apis = new KafkaApis(socketServer.requestChannel, replicaManager, 
offsetManager, consumerCoordinator,
+            kafkaController, zkClient, config.brokerId, config, metadataCache)
+          requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, 
socketServer.requestChannel, apis, config.numIoThreads)
+          brokerState.newState(RunningAsBroker)
+
+          Mx4jLoader.maybeLoad()
+
+          /* start topic config manager */
+          topicConfigManager = new TopicConfigManager(zkClient, logManager)
+          topicConfigManager.startup()
+
+          /* tell everyone we are alive */
+          val listeners = config.advertisedListeners.map {case(protocol, 
endpoint) =>
+            if (endpoint.port == 0)
+              (protocol, EndPoint(endpoint.host, socketServer.boundPort(), 
endpoint.protocolType))
+            else
+              (protocol, endpoint)
+          }
+          kafkaHealthcheck = new KafkaHealthcheck(config.brokerId, listeners, 
config.zkSessionTimeoutMs, zkClient)
+          kafkaHealthcheck.startup()
+
+          /* register broker metrics */
+          registerStats()
+
+          shutdownLatch = new CountDownLatch(1)
+          startupComplete.set(true)
+          isStartingUp.set(false)
+          info("started")
       }
     }
     catch {
@@ -243,7 +248,8 @@ class KafkaServer(val config: KafkaConfig, time: Time = 
SystemTime) extends Logg
                 if (channel != null) {
                   channel.disconnect()
                 }
-                channel = new BlockingChannel(broker.host, broker.port,
+                channel = new 
BlockingChannel(broker.getBrokerEndPoint(config.interBrokerSecurityProtocol).host,
+                  
broker.getBrokerEndPoint(config.interBrokerSecurityProtocol).port,
                   BlockingChannel.UseDefaultBufferSize,
                   BlockingChannel.UseDefaultBufferSize,
                   config.controllerSocketTimeoutMs)
@@ -421,7 +427,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = 
SystemTime) extends Logg
     * <li> config has broker.id and meta.properties contains broker.id if they 
don't match throws InconsistentBrokerIdException
     * <li> config has broker.id and there is no meta.properties file, creates 
new meta.properties and stores broker.id
     * <ol>
-    * @returns A brokerId.
+    * @return A brokerId.
     */
   private def getBrokerId: Int =  {
     var brokerId = config.brokerId

http://git-wip-us.apache.org/repos/asf/kafka/blob/53f31432/core/src/main/scala/kafka/server/MetadataCache.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/MetadataCache.scala 
b/core/src/main/scala/kafka/server/MetadataCache.scala
index 6aef6e4..008f02b 100644
--- a/core/src/main/scala/kafka/server/MetadataCache.scala
+++ b/core/src/main/scala/kafka/server/MetadataCache.scala
@@ -17,10 +17,13 @@
 
 package kafka.server
 
+import kafka.cluster.{BrokerEndpoint,Broker}
+import kafka.common.{ErrorMapping, ReplicaNotAvailableException, 
LeaderNotAvailableException}
+import kafka.common.TopicAndPartition
+
 import kafka.api._
-import kafka.common._
-import kafka.cluster.Broker
 import kafka.controller.KafkaController.StateChangeLogger
+import org.apache.kafka.common.protocol.SecurityProtocol
 import scala.collection.{Seq, Set, mutable}
 import kafka.utils.Logging
 import kafka.utils.Utils._
@@ -39,7 +42,8 @@ private[server] class MetadataCache(brokerId: Int) extends 
Logging {
 
   this.logIdent = "[Kafka Metadata Cache on broker %d] ".format(brokerId)
 
-  def getTopicMetadata(topics: Set[String]) = {
+  def getTopicMetadata(topics: Set[String], protocol: SecurityProtocol) = {
+
     val isAllTopics = topics.isEmpty
     val topicsRequested = if(isAllTopics) cache.keySet else topics
     val topicResponses: mutable.ListBuffer[TopicMetadata] = new 
mutable.ListBuffer[TopicMetadata]
@@ -50,18 +54,21 @@ private[server] class MetadataCache(brokerId: Int) extends 
Logging {
           val partitionMetadata = partitionStateInfos.map {
             case (partitionId, partitionState) =>
               val replicas = partitionState.allReplicas
-              val replicaInfo: Seq[Broker] = 
replicas.map(aliveBrokers.getOrElse(_, null)).filter(_ != null).toSeq
-              var leaderInfo: Option[Broker] = None
-              var isrInfo: Seq[Broker] = Nil
+              val replicaInfo: Seq[BrokerEndpoint] = 
replicas.map(aliveBrokers.getOrElse(_, null)).filter(_ != 
null).toSeq.map(_.getBrokerEndPoint(protocol))
+              var leaderInfo: Option[BrokerEndpoint] = None
+              var leaderBrokerInfo: Option[Broker] = None
+              var isrInfo: Seq[BrokerEndpoint] = Nil
               val leaderIsrAndEpoch = 
partitionState.leaderIsrAndControllerEpoch
               val leader = leaderIsrAndEpoch.leaderAndIsr.leader
               val isr = leaderIsrAndEpoch.leaderAndIsr.isr
               val topicPartition = TopicAndPartition(topic, partitionId)
               try {
-                leaderInfo = aliveBrokers.get(leader)
-                if (!leaderInfo.isDefined)
+                leaderBrokerInfo = aliveBrokers.get(leader)
+                if (!leaderBrokerInfo.isDefined)
                   throw new LeaderNotAvailableException("Leader not available 
for %s".format(topicPartition))
-                isrInfo = isr.map(aliveBrokers.getOrElse(_, null)).filter(_ != 
null)
+                else
+                  leaderInfo = 
Some(leaderBrokerInfo.get.getBrokerEndPoint(protocol))
+                isrInfo = isr.map(aliveBrokers.getOrElse(_, null)).filter(_ != 
null).map(_.getBrokerEndPoint(protocol))
                 if (replicaInfo.size < replicas.size)
                   throw new ReplicaNotAvailableException("Replica information 
not available for following brokers: " +
                     
replicas.filterNot(replicaInfo.map(_.id).contains(_)).mkString(","))

http://git-wip-us.apache.org/repos/asf/kafka/blob/53f31432/core/src/main/scala/kafka/server/ReplicaFetcherManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherManager.scala 
b/core/src/main/scala/kafka/server/ReplicaFetcherManager.scala
index 351dbba..f0a2a5b 100644
--- a/core/src/main/scala/kafka/server/ReplicaFetcherManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaFetcherManager.scala
@@ -17,13 +17,13 @@
 
 package kafka.server
 
-import kafka.cluster.Broker
+import kafka.cluster.BrokerEndpoint
 
 class ReplicaFetcherManager(private val brokerConfig: KafkaConfig, private val 
replicaMgr: ReplicaManager)
         extends AbstractFetcherManager("ReplicaFetcherManager on broker " + 
brokerConfig.brokerId,
                                        "Replica", 
brokerConfig.numReplicaFetchers) {
 
-  override def createFetcherThread(fetcherId: Int, sourceBroker: Broker): 
AbstractFetcherThread = {
+  override def createFetcherThread(fetcherId: Int, sourceBroker: 
BrokerEndpoint): AbstractFetcherThread = {
     new ReplicaFetcherThread("ReplicaFetcherThread-%d-%d".format(fetcherId, 
sourceBroker.id), sourceBroker, brokerConfig, replicaMgr)
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/53f31432/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala 
b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
index 96faa7b..b2196c8 100644
--- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
@@ -18,14 +18,14 @@
 package kafka.server
 
 import kafka.admin.AdminUtils
-import kafka.cluster.Broker
+import kafka.cluster.BrokerEndpoint
 import kafka.log.LogConfig
 import kafka.message.ByteBufferMessageSet
 import kafka.api.{OffsetRequest, FetchResponsePartitionData}
 import kafka.common.{KafkaStorageException, TopicAndPartition}
 
 class ReplicaFetcherThread(name:String,
-                           sourceBroker: Broker,
+                           sourceBroker: BrokerEndpoint,
                            brokerConfig: KafkaConfig,
                            replicaMgr: ReplicaManager)
   extends AbstractFetcherThread(name = name,

http://git-wip-us.apache.org/repos/asf/kafka/blob/53f31432/core/src/main/scala/kafka/server/ReplicaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala 
b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 6e43622..144a15e 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -19,7 +19,7 @@ package kafka.server
 import kafka.api._
 import kafka.common._
 import kafka.utils._
-import kafka.cluster.{Broker, Partition, Replica}
+import kafka.cluster.{BrokerEndpoint, Partition, Replica}
 import kafka.log.{LogAppendInfo, LogManager}
 import kafka.metrics.KafkaMetricsGroup
 import kafka.controller.KafkaController
@@ -684,7 +684,7 @@ class ReplicaManager(val config: KafkaConfig,
    * the error message will be set on each partition since we do not know 
which partition caused it
    */
   private def makeFollowers(controllerId: Int, epoch: Int, partitionState: 
Map[Partition, PartitionStateInfo],
-                            leaders: Set[Broker], correlationId: Int, 
responseMap: mutable.Map[(String, Int), Short],
+                            leaders: Set[BrokerEndpoint], correlationId: Int, 
responseMap: mutable.Map[(String, Int), Short],
                             offsetManager: OffsetManager) {
     partitionState.foreach { state =>
       stateChangeLogger.trace(("Broker %d handling LeaderAndIsr request 
correlationId %d from controller %d epoch %d " +

http://git-wip-us.apache.org/repos/asf/kafka/blob/53f31432/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala 
b/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala
index d1e7c43..d2bac85 100644
--- a/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala
+++ b/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala
@@ -24,11 +24,11 @@ import kafka.utils._
 import kafka.consumer.SimpleConsumer
 import kafka.api.{OffsetFetchResponse, OffsetFetchRequest, OffsetRequest}
 import kafka.common.{OffsetMetadataAndError, ErrorMapping, 
BrokerNotAvailableException, TopicAndPartition}
+import org.apache.kafka.common.protocol.SecurityProtocol
 import scala.collection._
 import kafka.client.ClientUtils
 import kafka.network.BlockingChannel
 import kafka.api.PartitionOffsetRequestInfo
-import scala.Some
 import org.I0Itec.zkclient.exception.ZkNoNodeException
 
 object ConsumerOffsetChecker extends Logging {

http://git-wip-us.apache.org/repos/asf/kafka/blob/53f31432/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala 
b/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala
index ba6ddd7..d1050b4 100644
--- a/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala
+++ b/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala
@@ -18,7 +18,7 @@
 package kafka.tools
 
 import joptsimple.OptionParser
-import kafka.cluster.Broker
+import kafka.cluster.BrokerEndpoint
 import kafka.message.{MessageSet, MessageAndOffset, ByteBufferMessageSet}
 import java.util.concurrent.CountDownLatch
 import java.util.concurrent.atomic.AtomicReference
@@ -197,7 +197,7 @@ private case class MessageInfo(replicaId: Int, offset: 
Long, nextOffset: Long, c
 private class ReplicaBuffer(expectedReplicasPerTopicAndPartition: 
Map[TopicAndPartition, Int],
                             leadersPerBroker: Map[Int, Seq[TopicAndPartition]],
                             expectedNumFetchers: Int,
-                            brokerMap: Map[Int, Broker],
+                            brokerMap: Map[Int, BrokerEndpoint],
                             initialOffsetTime: Long,
                             reportInterval: Long) extends Logging {
   private val fetchOffsetMap = new Pool[TopicAndPartition, Long]
@@ -335,7 +335,7 @@ private class 
ReplicaBuffer(expectedReplicasPerTopicAndPartition: Map[TopicAndPa
   }
 }
 
-private class ReplicaFetcher(name: String, sourceBroker: Broker, 
topicAndPartitions: Iterable[TopicAndPartition],
+private class ReplicaFetcher(name: String, sourceBroker: BrokerEndpoint, 
topicAndPartitions: Iterable[TopicAndPartition],
                              replicaBuffer: ReplicaBuffer, socketTimeout: Int, 
socketBufferSize: Int,
                              fetchSize: Int, maxWait: Int, minBytes: Int, 
doVerification: Boolean)
   extends ShutdownableThread(name) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/53f31432/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala 
b/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
index b4f903b..7379fe3 100644
--- a/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
+++ b/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
@@ -22,7 +22,7 @@ import kafka.utils._
 import kafka.consumer._
 import kafka.client.ClientUtils
 import kafka.api.{OffsetRequest, FetchRequestBuilder, Request}
-import kafka.cluster.Broker
+import kafka.cluster.BrokerEndpoint
 import scala.collection.JavaConversions._
 import kafka.common.TopicAndPartition
 
@@ -142,8 +142,8 @@ object SimpleConsumerShell extends Logging {
     }
 
     // validating replica id and initializing target broker
-    var fetchTargetBroker: Broker = null
-    var replicaOpt: Option[Broker] = null
+    var fetchTargetBroker: BrokerEndpoint = null
+    var replicaOpt: Option[BrokerEndpoint] = null
     if(replicaId == UseLeaderReplica) {
       replicaOpt = partitionMetadataOpt.get.leader
       if(!replicaOpt.isDefined) {
@@ -167,7 +167,9 @@ object SimpleConsumerShell extends Logging {
       System.exit(1)
     }
     if (startingOffset < 0) {
-      val simpleConsumer = new SimpleConsumer(fetchTargetBroker.host, 
fetchTargetBroker.port, ConsumerConfig.SocketTimeout,
+      val simpleConsumer = new SimpleConsumer(fetchTargetBroker.host,
+                                              fetchTargetBroker.port,
+                                              ConsumerConfig.SocketTimeout,
                                               ConsumerConfig.SocketBufferSize, 
clientId)
       try {
         startingOffset = 
simpleConsumer.earliestOrLatestOffset(TopicAndPartition(topic, partitionId), 
startingOffset,
@@ -188,8 +190,12 @@ object SimpleConsumerShell extends Logging {
 
     val replicaString = if(replicaId > 0) "leader" else "replica"
     info("Starting simple consumer shell to partition [%s, %d], %s [%d], host 
and port: [%s, %d], from offset [%d]"
-                 .format(topic, partitionId, replicaString, replicaId, 
fetchTargetBroker.host, fetchTargetBroker.port, startingOffset))
-    val simpleConsumer = new SimpleConsumer(fetchTargetBroker.host, 
fetchTargetBroker.port, 10000, 64*1024, clientId)
+                 .format(topic, partitionId, replicaString, replicaId,
+                         fetchTargetBroker.host,
+                         fetchTargetBroker.port, startingOffset))
+    val simpleConsumer = new SimpleConsumer(fetchTargetBroker.host,
+                                            fetchTargetBroker.port,
+                                            10000, 64*1024, clientId)
     val thread = Utils.newThread("kafka-simpleconsumer-shell", new Runnable() {
       def run() {
         var offset = startingOffset

http://git-wip-us.apache.org/repos/asf/kafka/blob/53f31432/core/src/main/scala/kafka/tools/UpdateOffsetsInZK.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/UpdateOffsetsInZK.scala 
b/core/src/main/scala/kafka/tools/UpdateOffsetsInZK.scala
index 111c9a8..e82cb81 100644
--- a/core/src/main/scala/kafka/tools/UpdateOffsetsInZK.scala
+++ b/core/src/main/scala/kafka/tools/UpdateOffsetsInZK.scala
@@ -22,6 +22,7 @@ import kafka.consumer.{SimpleConsumer, ConsumerConfig}
 import kafka.api.{PartitionOffsetRequestInfo, OffsetRequest}
 import kafka.common.{TopicAndPartition, KafkaException}
 import kafka.utils.{ZKGroupTopicDirs, ZkUtils, ZKStringSerializer, Utils}
+import org.apache.kafka.common.protocol.SecurityProtocol
 
 
 /**
@@ -65,7 +66,9 @@ object UpdateOffsetsInZK {
 
       ZkUtils.getBrokerInfo(zkClient, broker) match {
         case Some(brokerInfo) =>
-          val consumer = new SimpleConsumer(brokerInfo.host, brokerInfo.port, 
10000, 100 * 1024, "UpdateOffsetsInZk")
+          val consumer = new 
SimpleConsumer(brokerInfo.getBrokerEndPoint(SecurityProtocol.PLAINTEXT).host,
+                                            
brokerInfo.getBrokerEndPoint(SecurityProtocol.PLAINTEXT).port,
+                                            10000, 100 * 1024, 
"UpdateOffsetsInZk")
           val topicAndPartition = TopicAndPartition(topic, partition)
           val request = OffsetRequest(Map(topicAndPartition -> 
PartitionOffsetRequestInfo(offsetOption, 1)))
           val offset = 
consumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition).offsets.head

http://git-wip-us.apache.org/repos/asf/kafka/blob/53f31432/core/src/main/scala/kafka/utils/Utils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/Utils.scala 
b/core/src/main/scala/kafka/utils/Utils.scala
index 894a6a6..afc3b4e 100644
--- a/core/src/main/scala/kafka/utils/Utils.scala
+++ b/core/src/main/scala/kafka/utils/Utils.scala
@@ -24,9 +24,13 @@ import java.nio.channels._
 import java.util.concurrent.locks.{ReadWriteLock, Lock}
 import java.lang.management._
 import javax.management._
+
+import org.apache.kafka.common.protocol.SecurityProtocol
+
 import scala.collection._
 import scala.collection.mutable
 import java.util.Properties
+import kafka.cluster.EndPoint
 import kafka.common.KafkaException
 import kafka.common.KafkaStorageException
 
@@ -607,4 +611,9 @@ object Utils extends Logging {
       .filter{ case (k,l) => (l > 1) }
       .keys
   }
+
+  def listenerListToEndPoints(listeners: String): 
immutable.Map[SecurityProtocol, EndPoint] = {
+    val listenerList = parseCsvList(listeners)
+    listenerList.map(listener => EndPoint.createEndPoint(listener)).map(ep => 
ep.protocolType -> ep).toMap
+  }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/53f31432/core/src/main/scala/kafka/utils/ZkUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala 
b/core/src/main/scala/kafka/utils/ZkUtils.scala
index 7ae999e..82b0e33 100644
--- a/core/src/main/scala/kafka/utils/ZkUtils.scala
+++ b/core/src/main/scala/kafka/utils/ZkUtils.scala
@@ -17,13 +17,14 @@
 
 package kafka.utils
 
-import kafka.cluster.{Broker, Cluster}
+import kafka.cluster._
 import kafka.consumer.{ConsumerThreadId, TopicCount}
 import org.I0Itec.zkclient.ZkClient
 import org.I0Itec.zkclient.exception.{ZkNodeExistsException, ZkNoNodeException,
   ZkMarshallingError, ZkBadVersionException}
 import org.I0Itec.zkclient.serialize.ZkSerializer
 import org.apache.kafka.common.config.ConfigException
+import org.apache.kafka.common.protocol.SecurityProtocol
 import collection._
 import kafka.api.LeaderAndIsr
 import org.apache.zookeeper.data.Stat
@@ -31,10 +32,8 @@ import kafka.admin._
 import kafka.common.{KafkaException, NoEpochForPartitionException}
 import kafka.controller.ReassignedPartitionsContext
 import kafka.controller.KafkaController
-import scala.Some
 import kafka.controller.LeaderIsrAndControllerEpoch
 import kafka.common.TopicAndPartition
-import scala.collection
 
 object ZkUtils extends Logging {
   val ConsumersPath = "/consumers"
@@ -84,6 +83,10 @@ object ZkUtils extends Logging {
     brokerIds.map(_.toInt).map(getBrokerInfo(zkClient, 
_)).filter(_.isDefined).map(_.get)
   }
 
+  def getAllBrokerEndPointsForChannel(zkClient: ZkClient, protocolType: 
SecurityProtocol): Seq[BrokerEndpoint] = {
+    getAllBrokersInCluster(zkClient).map(_.getBrokerEndPoint(protocolType))
+  }
+
   def getLeaderAndIsrForPartition(zkClient: ZkClient, topic: String, 
partition: Int):Option[LeaderAndIsr] = {
     ReplicationUtils.getLeaderIsrAndEpochForPartition(zkClient, topic, 
partition).map(_.leaderAndIsr)
   }
@@ -169,12 +172,28 @@ object ZkUtils extends Logging {
     }
   }
 
-  def registerBrokerInZk(zkClient: ZkClient, id: Int, host: String, port: Int, 
timeout: Int, jmxPort: Int) {
+  /**
+   * Register brokers with v2 json format (which includes multiple endpoints).
+   * This format also includes default endpoints for compatibility with older 
clients.
+   * @param zkClient
+   * @param id
+   * @param advertisedEndpoints
+   * @param timeout
+   * @param jmxPort
+   */
+  def registerBrokerInZk(zkClient: ZkClient, id: Int, host: String, port: Int, 
advertisedEndpoints: immutable.Map[SecurityProtocol, EndPoint], timeout: Int, 
jmxPort: Int) {
     val brokerIdPath = ZkUtils.BrokerIdsPath + "/" + id
     val timestamp = SystemTime.milliseconds.toString
-    val brokerInfo = Json.encode(Map("version" -> 1, "host" -> host, "port" -> 
port, "jmx_port" -> jmxPort, "timestamp" -> timestamp))
-    val expectedBroker = new Broker(id, host, port)
 
+    val brokerInfo = Json.encode(Map("version" -> 2, "host" -> host, "port" -> 
port, "endpoints"->advertisedEndpoints.values.map(_.connectionString).toArray, 
"jmx_port" -> jmxPort, "timestamp" -> timestamp))
+    val expectedBroker = new Broker(id, advertisedEndpoints)
+
+    registerBrokerInZk(zkClient, brokerIdPath, brokerInfo, expectedBroker, 
timeout)
+
+    info("Registered broker %d at path %s with addresses: %s".format(id, 
brokerIdPath, advertisedEndpoints.mkString(",")))
+  }
+
+  private def registerBrokerInZk(zkClient: ZkClient, brokerIdPath: String, 
brokerInfo: String, expectedBroker: Broker, timeout: Int) {
     try {
       createEphemeralPathExpectConflictHandleZKBug(zkClient, brokerIdPath, 
brokerInfo, expectedBroker,
         (brokerString: String, broker: Any) => 
Broker.createBroker(broker.asInstanceOf[Broker].id, 
brokerString).equals(broker.asInstanceOf[Broker]),
@@ -183,11 +202,10 @@ object ZkUtils extends Logging {
     } catch {
       case e: ZkNodeExistsException =>
         throw new RuntimeException("A broker is already registered on the path 
" + brokerIdPath
-          + ". This probably " + "indicates that you either have configured a 
brokerid that is already in use, or "
-          + "else you have shutdown this broker and restarted it faster than 
the zookeeper "
-          + "timeout so it appears to be re-registering.")
+                + ". This probably " + "indicates that you either have 
configured a brokerid that is already in use, or "
+                + "else you have shutdown this broker and restarted it faster 
than the zookeeper "
+                + "timeout so it appears to be re-registering.")
     }
-    info("Registered broker %d at path %s with address %s:%d.".format(id, 
brokerIdPath, host, port))
   }
 
   def getConsumerPartitionOwnerPath(group: String, topic: String, partition: 
Int): String = {

http://git-wip-us.apache.org/repos/asf/kafka/blob/53f31432/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala 
b/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala
index aba256d..9811a2b 100644
--- a/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala
@@ -17,8 +17,6 @@
 
 package kafka.api
 
-import java.lang.{Integer, IllegalArgumentException}
-
 import org.apache.kafka.clients.producer._
 import org.scalatest.junit.JUnit3Suite
 import org.junit.Test

http://git-wip-us.apache.org/repos/asf/kafka/blob/53f31432/core/src/test/scala/other/kafka/TestOffsetManager.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/other/kafka/TestOffsetManager.scala 
b/core/src/test/scala/other/kafka/TestOffsetManager.scala
index a106379..9881bd3 100644
--- a/core/src/test/scala/other/kafka/TestOffsetManager.scala
+++ b/core/src/test/scala/other/kafka/TestOffsetManager.scala
@@ -3,6 +3,7 @@ package other.kafka
 import org.I0Itec.zkclient.ZkClient
 import kafka.api._
 import kafka.utils.{ShutdownableThread, ZKStringSerializer}
+import org.apache.kafka.common.protocol.SecurityProtocol
 import scala.collection._
 import kafka.client.ClientUtils
 import joptsimple.OptionParser
@@ -12,7 +13,6 @@ import scala.util.Random
 import java.io.IOException
 import kafka.metrics.{KafkaTimer, KafkaMetricsGroup}
 import java.util.concurrent.TimeUnit
-import com.yammer.metrics.core.Gauge
 import java.util.concurrent.atomic.AtomicInteger
 import java.nio.channels.ClosedByInterruptException
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/53f31432/core/src/test/scala/unit/kafka/KafkaConfigTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/KafkaConfigTest.scala 
b/core/src/test/scala/unit/kafka/KafkaConfigTest.scala
index 4d36b8b..bc4aef3 100644
--- a/core/src/test/scala/unit/kafka/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/KafkaConfigTest.scala
@@ -65,14 +65,14 @@ class KafkaTest {
     assertEquals(2, config2.brokerId)
 
     // We should be also able to set completely new property
-    val config3 = Kafka.getKafkaConfigFromArgs(Array(propertiesFile, 
"--override", "port=1987"))
+    val config3 = Kafka.getKafkaConfigFromArgs(Array(propertiesFile, 
"--override", "log.cleanup.policy=compact"))
     assertEquals(1, config3.brokerId)
-    assertEquals(1987, config3.port)
+    assertEquals("compact", config3.logCleanupPolicy)
 
     // We should be also able to set several properties
-    val config4 = Kafka.getKafkaConfigFromArgs(Array(propertiesFile, 
"--override", "port=1987", "--override", "broker.id=2"))
+    val config4 = Kafka.getKafkaConfigFromArgs(Array(propertiesFile, 
"--override", "log.cleanup.policy=compact", "--override", "broker.id=2"))
     assertEquals(2, config4.brokerId)
-    assertEquals(1987, config4.port)
+    assertEquals("compact", config4.logCleanupPolicy)
   }
 
   @Test(expected = classOf[ExitCalled])

http://git-wip-us.apache.org/repos/asf/kafka/blob/53f31432/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala 
b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
index 99ac923..76b8b24 100644
--- a/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
@@ -17,6 +17,7 @@
 
 package kafka.admin
 
+import org.apache.kafka.common.protocol.SecurityProtocol
 import org.scalatest.junit.JUnit3Suite
 import kafka.zk.ZooKeeperTestHarness
 import kafka.utils.TestUtils._
@@ -92,7 +93,7 @@ class AddPartitionsTest extends JUnit3Suite with 
ZooKeeperTestHarness {
     // read metadata from a broker and verify the new topic partitions exist
     TestUtils.waitUntilMetadataIsPropagated(servers, topic1, 1)
     TestUtils.waitUntilMetadataIsPropagated(servers, topic1, 2)
-    val metadata = ClientUtils.fetchTopicMetadata(Set(topic1), brokers, 
"AddPartitionsTest-testIncrementPartitions",
+    val metadata = ClientUtils.fetchTopicMetadata(Set(topic1), 
brokers.map(_.getBrokerEndPoint(SecurityProtocol.PLAINTEXT)), 
"AddPartitionsTest-testIncrementPartitions",
       2000,0).topicsMetadata
     val metaDataForTopic1 = metadata.filter(p => p.topic.equals(topic1))
     val partitionDataForTopic1 = metaDataForTopic1.head.partitionsMetadata
@@ -117,7 +118,7 @@ class AddPartitionsTest extends JUnit3Suite with 
ZooKeeperTestHarness {
     // read metadata from a broker and verify the new topic partitions exist
     TestUtils.waitUntilMetadataIsPropagated(servers, topic2, 1)
     TestUtils.waitUntilMetadataIsPropagated(servers, topic2, 2)
-    val metadata = ClientUtils.fetchTopicMetadata(Set(topic2), brokers, 
"AddPartitionsTest-testManualAssignmentOfReplicas",
+    val metadata = ClientUtils.fetchTopicMetadata(Set(topic2), 
brokers.map(_.getBrokerEndPoint(SecurityProtocol.PLAINTEXT)), 
"AddPartitionsTest-testManualAssignmentOfReplicas",
       2000,0).topicsMetadata
     val metaDataForTopic2 = metadata.filter(p => p.topic.equals(topic2))
     val partitionDataForTopic2 = metaDataForTopic2.head.partitionsMetadata
@@ -141,7 +142,7 @@ class AddPartitionsTest extends JUnit3Suite with 
ZooKeeperTestHarness {
     TestUtils.waitUntilMetadataIsPropagated(servers, topic3, 5)
     TestUtils.waitUntilMetadataIsPropagated(servers, topic3, 6)
 
-    val metadata = ClientUtils.fetchTopicMetadata(Set(topic3), brokers, 
"AddPartitionsTest-testReplicaPlacement",
+    val metadata = ClientUtils.fetchTopicMetadata(Set(topic3), 
brokers.map(_.getBrokerEndPoint(SecurityProtocol.PLAINTEXT)), 
"AddPartitionsTest-testReplicaPlacement",
       2000,0).topicsMetadata
 
     val metaDataForTopic3 = metadata.filter(p => p.topic.equals(topic3)).head

http://git-wip-us.apache.org/repos/asf/kafka/blob/53f31432/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala 
b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
index 030faac..83910f3 100644
--- a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
+++ b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
@@ -17,18 +17,19 @@
 
 package kafka.api
 
+
+import kafka.cluster.{BrokerEndpoint, EndPoint, Broker}
+import kafka.common.{OffsetAndMetadata, ErrorMapping, OffsetMetadataAndError}
 import kafka.common._
-import kafka.cluster.Broker
-import kafka.controller.LeaderIsrAndControllerEpoch
 import kafka.message.{Message, ByteBufferMessageSet}
 import kafka.utils.SystemTime
 
-import org.apache.kafka.common.TopicPartition
-import org.apache.kafka.common.requests._
+import kafka.controller.LeaderIsrAndControllerEpoch
+import kafka.common.TopicAndPartition
 
-import scala.Some
 import java.nio.ByteBuffer
 
+import org.apache.kafka.common.protocol.SecurityProtocol
 import org.junit._
 import org.scalatest.junit.JUnitSuite
 import junit.framework.Assert._
@@ -81,21 +82,47 @@ object SerializationTestUtils {
     TopicAndPartition(topic2, 3) -> PartitionFetchInfo(4000, 100)
   )
 
-  private val brokers = List(new Broker(0, "localhost", 1011), new Broker(1, 
"localhost", 1012), new Broker(2, "localhost", 1013))
-  private val partitionMetaData0 = new PartitionMetadata(0, 
Some(brokers.head), replicas = brokers, isr = brokers, errorCode = 0)
-  private val partitionMetaData1 = new PartitionMetadata(1, 
Some(brokers.head), replicas = brokers, isr = brokers.tail, errorCode = 1)
-  private val partitionMetaData2 = new PartitionMetadata(2, 
Some(brokers.head), replicas = brokers, isr = brokers, errorCode = 2)
-  private val partitionMetaData3 = new PartitionMetadata(3, 
Some(brokers.head), replicas = brokers, isr = brokers.tail.tail, errorCode = 3)
+  private val brokers = List(new Broker(0, Map(SecurityProtocol.PLAINTEXT -> 
EndPoint("localhost", 1011, SecurityProtocol.PLAINTEXT))),
+                             new Broker(1, Map(SecurityProtocol.PLAINTEXT -> 
EndPoint("localhost", 1012, SecurityProtocol.PLAINTEXT))),
+                             new Broker(2, Map(SecurityProtocol.PLAINTEXT -> 
EndPoint("localhost", 1013, SecurityProtocol.PLAINTEXT))))
+  private val brokerEndpoints = 
brokers.map(_.getBrokerEndPoint(SecurityProtocol.PLAINTEXT))
+
+  private val partitionMetaData0 = new PartitionMetadata(0, 
Some(brokerEndpoints.head), replicas = brokerEndpoints, isr = brokerEndpoints, 
errorCode = 0)
+  private val partitionMetaData1 = new PartitionMetadata(1, 
Some(brokerEndpoints.head), replicas = brokerEndpoints, isr = 
brokerEndpoints.tail, errorCode = 1)
+  private val partitionMetaData2 = new PartitionMetadata(2, 
Some(brokerEndpoints.head), replicas = brokerEndpoints, isr = brokerEndpoints, 
errorCode = 2)
+  private val partitionMetaData3 = new PartitionMetadata(3, 
Some(brokerEndpoints.head), replicas = brokerEndpoints, isr = 
brokerEndpoints.tail.tail, errorCode = 3)
   private val partitionMetaDataSeq = Seq(partitionMetaData0, 
partitionMetaData1, partitionMetaData2, partitionMetaData3)
   private val topicmetaData1 = new TopicMetadata(topic1, partitionMetaDataSeq)
   private val topicmetaData2 = new TopicMetadata(topic2, partitionMetaDataSeq)
 
+  private val leaderAndIsr0 = new LeaderAndIsr(leader = brokers.head.id, isr = 
brokers.map(_.id))
+  private val leaderAndIsr1 = new LeaderAndIsr(leader = brokers.head.id, isr = 
brokers.tail.map(_.id))
+  private val leaderAndIsr2 = new LeaderAndIsr(leader = brokers.head.id, isr = 
brokers.map(_.id))
+  private val leaderAndIsr3 = new LeaderAndIsr(leader = brokers.head.id, isr = 
brokers.tail.map(_.id))
+
+  private val leaderIsrAndControllerEpoch0 = new 
LeaderIsrAndControllerEpoch(leaderAndIsr0, controllerEpoch = 0)
+  private val leaderIsrAndControllerEpoch1 = new 
LeaderIsrAndControllerEpoch(leaderAndIsr1, controllerEpoch = 0)
+  private val leaderIsrAndControllerEpoch2 = new 
LeaderIsrAndControllerEpoch(leaderAndIsr2, controllerEpoch = 0)
+  private val leaderIsrAndControllerEpoch3 = new 
LeaderIsrAndControllerEpoch(leaderAndIsr3, controllerEpoch = 0)
+
+  private val partitionStateInfo0 = new 
PartitionStateInfo(leaderIsrAndControllerEpoch0, brokers.map(_.id).toSet)
+  private val partitionStateInfo1 = new 
PartitionStateInfo(leaderIsrAndControllerEpoch1, brokers.map(_.id).toSet)
+  private val partitionStateInfo2 = new 
PartitionStateInfo(leaderIsrAndControllerEpoch2, brokers.map(_.id).toSet)
+  private val partitionStateInfo3 = new 
PartitionStateInfo(leaderIsrAndControllerEpoch3, brokers.map(_.id).toSet)
+
+  private val updateMetadataRequestPartitionStateInfo = 
collection.immutable.Map(
+    TopicAndPartition(topic1,0) -> partitionStateInfo0,
+    TopicAndPartition(topic1,1) -> partitionStateInfo1,
+    TopicAndPartition(topic1,2) -> partitionStateInfo2,
+    TopicAndPartition(topic1,3) -> partitionStateInfo3
+  )
+
   def createTestLeaderAndIsrRequest() : LeaderAndIsrRequest = {
     val leaderAndIsr1 = new LeaderIsrAndControllerEpoch(new 
LeaderAndIsr(leader1, 1, isr1, 1), 1)
     val leaderAndIsr2 = new LeaderIsrAndControllerEpoch(new 
LeaderAndIsr(leader2, 1, isr2, 2), 1)
     val map = Map(((topic1, 0), PartitionStateInfo(leaderAndIsr1, isr1.toSet)),
                   ((topic2, 0), PartitionStateInfo(leaderAndIsr2, isr2.toSet)))
-    new LeaderAndIsrRequest(map.toMap, collection.immutable.Set[Broker](), 0, 
1, 0, "")
+    new LeaderAndIsrRequest(map.toMap, 
collection.immutable.Set[BrokerEndpoint](), 0, 1, 0, "")
   }
 
   def createTestLeaderAndIsrResponse() : LeaderAndIsrResponse = {
@@ -149,7 +176,7 @@ object SerializationTestUtils {
   }
 
   def createTestTopicMetadataResponse: TopicMetadataResponse = {
-    new TopicMetadataResponse(brokers, Seq(topicmetaData1, topicmetaData2), 1)
+    new 
TopicMetadataResponse(brokers.map(_.getBrokerEndPoint(SecurityProtocol.PLAINTEXT)).toVector,
 Seq(topicmetaData1, topicmetaData2), 1)
   }
 
   def createTestOffsetCommitRequestV2: OffsetCommitRequest = {
@@ -206,7 +233,23 @@ object SerializationTestUtils {
   }
 
   def createConsumerMetadataResponse: ConsumerMetadataResponse = {
-    ConsumerMetadataResponse(Some(brokers.head), ErrorMapping.NoError, 0)
+    
ConsumerMetadataResponse(Some(brokers.head.getBrokerEndPoint(SecurityProtocol.PLAINTEXT)),
 ErrorMapping.NoError, 0)
+  }
+
+  def createUpdateMetadataRequest(versionId: Short): UpdateMetadataRequest = {
+    UpdateMetadataRequest(
+      versionId,
+      correlationId = 0,
+      clientId = "client1",
+      controllerId = 0,
+      controllerEpoch = 0,
+      partitionStateInfos = updateMetadataRequestPartitionStateInfo,
+      brokers.toSet
+    )
+  }
+
+  def createUpdateMetadataResponse: UpdateMetadataResponse = {
+    UpdateMetadataResponse( correlationId = 0, errorCode = 0)
   }
 }
 
@@ -231,6 +274,10 @@ class RequestResponseSerializationTest extends JUnitSuite {
   private val consumerMetadataRequest = 
SerializationTestUtils.createConsumerMetadataRequest
   private val consumerMetadataResponse = 
SerializationTestUtils.createConsumerMetadataResponse
   private val consumerMetadataResponseNoCoordinator = 
ConsumerMetadataResponse(None, 
ErrorMapping.ConsumerCoordinatorNotAvailableCode, 0)
+  private val updateMetadataRequestV0 = 
SerializationTestUtils.createUpdateMetadataRequest(0)
+  private val updateMetadataRequestV1 = 
SerializationTestUtils.createUpdateMetadataRequest(1)
+  private val updateMetdataResponse = 
SerializationTestUtils.createUpdateMetadataResponse
+
 
   @Test
   def testSerializationAndDeserialization() {
@@ -243,6 +290,7 @@ class RequestResponseSerializationTest extends JUnitSuite {
                                offsetCommitRequestV0, offsetCommitRequestV1, 
offsetCommitRequestV2,
                                offsetCommitResponse, offsetFetchRequest, 
offsetFetchResponse,
                                consumerMetadataRequest, 
consumerMetadataResponse,
+                               updateMetadataRequestV0, 
updateMetadataRequestV1, updateMetdataResponse,
                                consumerMetadataResponseNoCoordinator)
 
     requestsAndResponses.foreach { original =>

http://git-wip-us.apache.org/repos/asf/kafka/blob/53f31432/core/src/test/scala/unit/kafka/cluster/BrokerEndPointTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/cluster/BrokerEndPointTest.scala 
b/core/src/test/scala/unit/kafka/cluster/BrokerEndPointTest.scala
new file mode 100644
index 0000000..ad58eed
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/cluster/BrokerEndPointTest.scala
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.cluster
+
+import java.nio.ByteBuffer
+
+import kafka.utils.Logging
+import org.apache.kafka.common.protocol.SecurityProtocol
+import org.junit.Test
+import org.scalatest.junit.JUnit3Suite
+
+import scala.collection.mutable
+
+class BrokerEndPointTest extends JUnit3Suite with Logging {
+
+  @Test
+  def testSerDe() = {
+
+    val endpoint = new EndPoint("myhost", 9092, SecurityProtocol.PLAINTEXT)
+    val listEndPoints = Map(SecurityProtocol.PLAINTEXT -> endpoint)
+    val origBroker = new Broker(1, listEndPoints)
+    val brokerBytes = ByteBuffer.allocate(origBroker.sizeInBytes)
+
+    origBroker.writeTo(brokerBytes)
+
+    val newBroker = 
Broker.readFrom(brokerBytes.flip().asInstanceOf[ByteBuffer])
+    assert(origBroker == newBroker)
+  }
+
+  @Test
+  def testHashAndEquals() =  {
+    val endpoint1 = new EndPoint("myhost", 9092, SecurityProtocol.PLAINTEXT)
+    val endpoint2 = new EndPoint("myhost", 9092, SecurityProtocol.PLAINTEXT)
+    val endpoint3 = new EndPoint("myhost", 1111, SecurityProtocol.PLAINTEXT)
+    val endpoint4 = new EndPoint("other", 1111, SecurityProtocol.PLAINTEXT)
+    val broker1 = new Broker(1, Map(SecurityProtocol.PLAINTEXT -> endpoint1))
+    val broker2 = new Broker(1, Map(SecurityProtocol.PLAINTEXT -> endpoint2))
+    val broker3 = new Broker(2, Map(SecurityProtocol.PLAINTEXT -> endpoint3))
+    val broker4 = new Broker(1, Map(SecurityProtocol.PLAINTEXT -> endpoint4))
+
+    assert(broker1 == broker2)
+    assert(broker1 != broker3)
+    assert(broker1 != broker4)
+    assert(broker1.hashCode() == broker2.hashCode())
+    assert(broker1.hashCode() != broker3.hashCode())
+    assert(broker1.hashCode() != broker4.hashCode())
+
+    val hashmap = new mutable.HashMap[Broker, Int]()
+    hashmap.put(broker1, 1)
+    assert(hashmap.getOrElse(broker1, -1) == 1)
+  }
+
+  @Test
+  def testFromJSON() = {
+    val brokerInfoStr = "{\"version\":2," +
+                          "\"host\":\"localhost\"," +
+                          "\"port\":9092," +
+                          "\"jmx_port\":9999," +
+                          "\"timestamp\":\"1416974968782\"," +
+                          "\"endpoints\":[\"PLAINTEXT://localhost:9092\"]}"
+    val broker = Broker.createBroker(1, brokerInfoStr)
+    assert(broker.id == 1)
+    assert(broker.getBrokerEndPoint(SecurityProtocol.PLAINTEXT).host == 
"localhost")
+    assert(broker.getBrokerEndPoint(SecurityProtocol.PLAINTEXT).port == 9092)
+  }
+
+  @Test
+  def testFromOldJSON() = {
+    val brokerInfoStr = 
"{\"jmx_port\":-1,\"timestamp\":\"1420485325400\",\"host\":\"172.16.8.243\",\"version\":1,\"port\":9091}"
+    val broker = Broker.createBroker(1, brokerInfoStr)
+    assert(broker.id == 1)
+    assert(broker.getBrokerEndPoint(SecurityProtocol.PLAINTEXT).host == 
"172.16.8.243")
+    assert(broker.getBrokerEndPoint(SecurityProtocol.PLAINTEXT).port == 9091)
+  }
+
+  @Test
+  def testBrokerEndpointFromURI() = {
+    var connectionString = "localhost:9092"
+    var endpoint = BrokerEndpoint.createBrokerEndPoint(1, connectionString)
+    assert(endpoint.host == "localhost")
+    assert(endpoint.port == 9092)
+    // also test for ipv6
+    connectionString = "[::1]:9092"
+    endpoint = BrokerEndpoint.createBrokerEndPoint(1, connectionString)
+    assert(endpoint.host == "::1")
+    assert(endpoint.port == 9092)
+  }
+
+  @Test
+  def testEndpointFromURI() = {
+    var connectionString = "PLAINTEXT://localhost:9092"
+    var endpoint = EndPoint.createEndPoint(connectionString)
+    assert(endpoint.host == "localhost")
+    assert(endpoint.port == 9092)
+    assert(endpoint.connectionString == "PLAINTEXT://localhost:9092")
+    // also test for default bind
+    connectionString = "PLAINTEXT://:9092"
+    endpoint = EndPoint.createEndPoint(connectionString)
+    assert(endpoint.host == null)
+    assert(endpoint.port == 9092)
+    assert(endpoint.connectionString == "PLAINTEXT://:9092")
+    // also test for ipv6
+    connectionString = "PLAINTEXT://[::1]:9092"
+    endpoint = EndPoint.createEndPoint(connectionString)
+    assert(endpoint.host == "::1")
+    assert(endpoint.port == 9092)
+    assert(endpoint.connectionString ==  "PLAINTEXT://[::1]:9092")
+  }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/53f31432/core/src/test/scala/unit/kafka/integration/FetcherTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/integration/FetcherTest.scala 
b/core/src/test/scala/unit/kafka/integration/FetcherTest.scala
index ecb5a33..0dc837a 100644
--- a/core/src/test/scala/unit/kafka/integration/FetcherTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/FetcherTest.scala
@@ -28,7 +28,6 @@ import org.scalatest.junit.JUnit3Suite
 import kafka.consumer._
 import kafka.serializer._
 import kafka.producer.{KeyedMessage, Producer}
-import kafka.utils.TestUtils._
 import kafka.utils.TestUtils
 
 class FetcherTest extends JUnit3Suite with KafkaServerTestHarness {
@@ -37,14 +36,13 @@ class FetcherTest extends JUnit3Suite with 
KafkaServerTestHarness {
 
   val messages = new mutable.HashMap[Int, Seq[Array[Byte]]]
   val topic = "topic"
-
   val queue = new LinkedBlockingQueue[FetchedDataChunk]
 
   var fetcher: ConsumerFetcherManager = null
 
   override def setUp() {
     super.setUp
-    createTopic(zkClient, topic, partitionReplicaAssignment = Map(0 -> 
Seq(configs.head.brokerId)), servers = servers)
+    TestUtils.createTopic(zkClient, topic, partitionReplicaAssignment = Map(0 
-> Seq(configs.head.brokerId)), servers = servers)
 
     val cluster = new Cluster(servers.map(s => new Broker(s.config.brokerId, 
"localhost", s.boundPort())))
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/53f31432/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala 
b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
index 28e3122..f95fb62 100644
--- a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
+++ b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
@@ -18,9 +18,10 @@
 package kafka.integration
 
 import java.util.Arrays
+
 import scala.collection.mutable.Buffer
 import kafka.server._
-import kafka.utils.{Utils, TestUtils}
+import kafka.utils.{TestUtils, Utils}
 import org.scalatest.junit.JUnit3Suite
 import kafka.zk.ZooKeeperTestHarness
 import kafka.common.KafkaException
@@ -47,8 +48,9 @@ trait KafkaServerTestHarness extends JUnit3Suite with 
ZooKeeperTestHarness {
   }
 
   def serverForId(id: Int) = servers.find(s => s.config.brokerId == id)
-  
+
   def bootstrapUrl = servers.map(s => s.config.hostName + ":" + 
s.boundPort()).mkString(",")
+
   
   override def setUp() {
     super.setUp

http://git-wip-us.apache.org/repos/asf/kafka/blob/53f31432/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala 
b/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
index 56b1b8c..603cf76 100644
--- a/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
@@ -17,12 +17,13 @@
 
 package kafka.integration
 
+import org.apache.kafka.common.protocol.SecurityProtocol
 import org.scalatest.junit.JUnit3Suite
 import kafka.zk.ZooKeeperTestHarness
 import kafka.admin.AdminUtils
 import java.nio.ByteBuffer
 import junit.framework.Assert._
-import kafka.cluster.Broker
+import kafka.cluster.{BrokerEndpoint, Broker}
 import kafka.utils.TestUtils
 import kafka.utils.TestUtils._
 import kafka.server.{KafkaServer, KafkaConfig}
@@ -32,14 +33,14 @@ import kafka.client.ClientUtils
 
 class TopicMetadataTest extends JUnit3Suite with ZooKeeperTestHarness {
   private var server1: KafkaServer = null
-  var brokers: Seq[Broker] = null
+  var brokerEndPoints: Seq[BrokerEndpoint] = null
 
   override def setUp() {
     super.setUp()
     val props = createBrokerConfigs(1, zkConnect)
     val configs = props.map(KafkaConfig.fromProps)
     server1 = TestUtils.createServer(configs.head)
-    brokers = Seq(new Broker(server1.config.brokerId, server1.config.hostName, 
server1.boundPort()))
+    brokerEndPoints = Seq(new Broker(server1.config.brokerId, 
server1.config.hostName, 
server1.boundPort()).getBrokerEndPoint(SecurityProtocol.PLAINTEXT))
   }
 
   override def tearDown() {
@@ -68,7 +69,7 @@ class TopicMetadataTest extends JUnit3Suite with 
ZooKeeperTestHarness {
     val topic = "test"
     createTopic(zkClient, topic, numPartitions = 1, replicationFactor = 1, 
servers = Seq(server1))
 
-    var topicsMetadata = 
ClientUtils.fetchTopicMetadata(Set(topic),brokers,"TopicMetadataTest-testBasicTopicMetadata",
+    var topicsMetadata = ClientUtils.fetchTopicMetadata(Set(topic), 
brokerEndPoints, "TopicMetadataTest-testBasicTopicMetadata",
       2000,0).topicsMetadata
     assertEquals(ErrorMapping.NoError, topicsMetadata.head.errorCode)
     assertEquals(ErrorMapping.NoError, 
topicsMetadata.head.partitionsMetadata.head.errorCode)
@@ -88,7 +89,7 @@ class TopicMetadataTest extends JUnit3Suite with 
ZooKeeperTestHarness {
     createTopic(zkClient, topic2, numPartitions = 1, replicationFactor = 1, 
servers = Seq(server1))
 
     // issue metadata request with empty list of topics
-    var topicsMetadata = ClientUtils.fetchTopicMetadata(Set.empty, brokers, 
"TopicMetadataTest-testGetAllTopicMetadata",
+    var topicsMetadata = ClientUtils.fetchTopicMetadata(Set.empty, 
brokerEndPoints, "TopicMetadataTest-testGetAllTopicMetadata",
       2000, 0).topicsMetadata
     assertEquals(ErrorMapping.NoError, topicsMetadata.head.errorCode)
     assertEquals(2, topicsMetadata.size)
@@ -107,7 +108,7 @@ class TopicMetadataTest extends JUnit3Suite with 
ZooKeeperTestHarness {
   def testAutoCreateTopic {
     // auto create topic
     val topic = "testAutoCreateTopic"
-    var topicsMetadata = 
ClientUtils.fetchTopicMetadata(Set(topic),brokers,"TopicMetadataTest-testAutoCreateTopic",
+    var topicsMetadata = ClientUtils.fetchTopicMetadata(Set(topic), 
brokerEndPoints, "TopicMetadataTest-testAutoCreateTopic",
       2000,0).topicsMetadata
     assertEquals(ErrorMapping.LeaderNotAvailableCode, 
topicsMetadata.head.errorCode)
     assertEquals("Expecting metadata only for 1 topic", 1, topicsMetadata.size)
@@ -119,7 +120,7 @@ class TopicMetadataTest extends JUnit3Suite with 
ZooKeeperTestHarness {
     TestUtils.waitUntilMetadataIsPropagated(Seq(server1), topic, 0)
 
     // retry the metadata for the auto created topic
-    topicsMetadata = 
ClientUtils.fetchTopicMetadata(Set(topic),brokers,"TopicMetadataTest-testBasicTopicMetadata",
+    topicsMetadata = ClientUtils.fetchTopicMetadata(Set(topic), 
brokerEndPoints, "TopicMetadataTest-testBasicTopicMetadata",
       2000,0).topicsMetadata
     assertEquals(ErrorMapping.NoError, topicsMetadata.head.errorCode)
     assertEquals(ErrorMapping.NoError, 
topicsMetadata.head.partitionsMetadata.head.errorCode)

http://git-wip-us.apache.org/repos/asf/kafka/blob/53f31432/core/src/test/scala/unit/kafka/log/LogTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala 
b/core/src/test/scala/unit/kafka/log/LogTest.scala
index ac4c5b9..bd9a409 100644
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -32,7 +32,7 @@ class LogTest extends JUnitSuite {
   var logDir: File = null
   val time = new MockTime(0)
   var config: KafkaConfig = null
-  val logConfig = LogConfig()
+  val logConfig = LogConfig()  
 
   @Before
   def setUp() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/53f31432/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala 
b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
index 79a806c..95d5621 100644
--- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
+++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
@@ -19,6 +19,8 @@ package kafka.network;
 
 import java.net._
 import java.io._
+import kafka.cluster.EndPoint
+import org.apache.kafka.common.protocol.SecurityProtocol
 import org.junit._
 import org.scalatest.junit.JUnitSuite
 import java.util.Random
@@ -35,8 +37,8 @@ import scala.collection.Map
 class SocketServerTest extends JUnitSuite {
 
   val server: SocketServer = new SocketServer(0,
-                                              host = null,
-                                              port = 0,
+                                              Map(SecurityProtocol.PLAINTEXT 
-> EndPoint(null, 0, SecurityProtocol.PLAINTEXT),
+                                                  SecurityProtocol.TRACE -> 
EndPoint(null, 0, SecurityProtocol.TRACE)),
                                               numProcessorThreads = 1,
                                               maxQueuedRequests = 50,
                                               sendBufferSize = 300000,
@@ -73,7 +75,10 @@ class SocketServerTest extends JUnitSuite {
     channel.sendResponse(new RequestChannel.Response(request.processor, 
request, send))
   }
 
-  def connect(s:SocketServer = server) = new Socket("localhost", s.boundPort)
+  def connect(s:SocketServer = server, protocol: SecurityProtocol = 
SecurityProtocol.PLAINTEXT) = {
+    new Socket("localhost", server.boundPort(protocol))
+  }
+
 
   @After
   def cleanup() {
@@ -81,7 +86,8 @@ class SocketServerTest extends JUnitSuite {
   }
   @Test
   def simpleRequest() {
-    val socket = connect()
+    val plainSocket = connect(protocol = SecurityProtocol.PLAINTEXT)
+    val traceSocket = connect(protocol = SecurityProtocol.TRACE)
     val correlationId = -1
     val clientId = SyncProducerConfig.DefaultClientId
     val ackTimeoutMs = SyncProducerConfig.DefaultAckTimeoutMs
@@ -95,9 +101,15 @@ class SocketServerTest extends JUnitSuite {
     val serializedBytes = new Array[Byte](byteBuffer.remaining)
     byteBuffer.get(serializedBytes)
 
-    sendRequest(socket, 0, serializedBytes)
+    // Test PLAINTEXT socket
+    sendRequest(plainSocket, 0, serializedBytes)
+    processRequest(server.requestChannel)
+    assertEquals(serializedBytes.toSeq, receiveResponse(plainSocket).toSeq)
+
+    // Test TRACE socket
+    sendRequest(traceSocket, 0, serializedBytes)
     processRequest(server.requestChannel)
-    assertEquals(serializedBytes.toSeq, receiveResponse(socket).toSeq)
+    assertEquals(serializedBytes.toSeq, receiveResponse(traceSocket).toSeq)
   }
 
   @Test(expected = classOf[IOException])
@@ -129,21 +141,38 @@ class SocketServerTest extends JUnitSuite {
       "Socket key should be available for reads")
   }
 
-  @Test(expected = classOf[IOException])
+  @Test
   def testSocketsCloseOnShutdown() {
     // open a connection
-    val socket = connect()
+    val plainSocket = connect(protocol = SecurityProtocol.PLAINTEXT)
+    val traceSocket = connect(protocol = SecurityProtocol.TRACE)
     val bytes = new Array[Byte](40)
     // send a request first to make sure the connection has been picked up by 
the socket server
-    sendRequest(socket, 0, bytes)
+    sendRequest(plainSocket, 0, bytes)
+    sendRequest(traceSocket, 0, bytes)
     processRequest(server.requestChannel)
+
+    // make sure the sockets are open
+    server.acceptors.values.map(acceptor => 
assertFalse(acceptor.serverChannel.socket.isClosed))
     // then shutdown the server
     server.shutdown()
 
     val largeChunkOfBytes = new Array[Byte](1000000)
     // doing a subsequent send should throw an exception as the connection 
should be closed.
     // send a large chunk of bytes to trigger a socket flush
-    sendRequest(socket, 0, largeChunkOfBytes)
+    try {
+      sendRequest(plainSocket, 0, largeChunkOfBytes)
+      fail("expected exception when writing to closed plain socket")
+    } catch {
+      case e: IOException => // expected
+    }
+
+    try {
+      sendRequest(traceSocket, 0, largeChunkOfBytes)
+      fail("expected exception when writing to closed trace socket")
+    } catch {
+      case e: IOException => // expected
+    }
   }
 
   @Test
@@ -161,8 +190,7 @@ class SocketServerTest extends JUnitSuite {
     val overrideNum = 6
     val overrides: Map[String, Int] = Map("localhost" -> overrideNum)
     val overrideServer: SocketServer = new SocketServer(0,
-                                                host = null,
-                                                port = 0,
+                                                Map(SecurityProtocol.PLAINTEXT 
-> EndPoint(null, 0, SecurityProtocol.PLAINTEXT)),
                                                 numProcessorThreads = 1,
                                                 maxQueuedRequests = 50,
                                                 sendBufferSize = 300000,

http://git-wip-us.apache.org/repos/asf/kafka/blob/53f31432/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala 
b/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
index d2ab683..3b82fb3 100644
--- a/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
+++ b/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
@@ -23,7 +23,7 @@ import junit.framework.Assert._
 import org.easymock.EasyMock
 import org.junit.Test
 import kafka.api._
-import kafka.cluster.Broker
+import kafka.cluster.{BrokerEndpoint, Broker}
 import kafka.common._
 import kafka.message._
 import kafka.producer.async._
@@ -165,8 +165,8 @@ class AsyncProducerTest extends JUnit3Suite {
 
     val props = new Properties()
     props.put("metadata.broker.list", brokerList)
-    val broker1 = new Broker(0, "localhost", 9092)
-    val broker2 = new Broker(1, "localhost", 9093)
+    val broker1 = new BrokerEndpoint(0, "localhost", 9092)
+    val broker2 = new BrokerEndpoint(1, "localhost", 9093)
 
     // form expected partitions metadata
     val partition1Metadata = new PartitionMetadata(0, Some(broker1), 
List(broker1, broker2))
@@ -469,7 +469,7 @@ class AsyncProducerTest extends JUnit3Suite {
   }
 
   private def getTopicMetadata(topic: String, partition: Seq[Int], brokerId: 
Int, brokerHost: String, brokerPort: Int): TopicMetadata = {
-    val broker1 = new Broker(brokerId, brokerHost, brokerPort)
+    val broker1 = new BrokerEndpoint(brokerId, brokerHost, brokerPort)
     new TopicMetadata(topic, partition.map(new PartitionMetadata(_, 
Some(broker1), List(broker1))))
   }
   

http://git-wip-us.apache.org/repos/asf/kafka/blob/53f31432/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala 
b/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
index 7d6f655..8c3fb7a 100644
--- a/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
+++ b/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
@@ -19,16 +19,18 @@ package kafka.producer
 
 import java.net.SocketTimeoutException
 import java.util.Properties
+
 import junit.framework.Assert
 import kafka.admin.AdminUtils
+import kafka.api.ProducerResponseStatus
+import kafka.common.{ErrorMapping, TopicAndPartition}
 import kafka.integration.KafkaServerTestHarness
 import kafka.message._
 import kafka.server.KafkaConfig
 import kafka.utils._
+import org.apache.kafka.common.protocol.SecurityProtocol
 import org.junit.Test
 import org.scalatest.junit.JUnit3Suite
-import kafka.api.ProducerResponseStatus
-import kafka.common.{TopicAndPartition, ErrorMapping}
 
 class SyncProducerTest extends JUnit3Suite with KafkaServerTestHarness {
   private val messageBytes =  new Array[Byte](2)
@@ -38,7 +40,9 @@ class SyncProducerTest extends JUnit3Suite with 
KafkaServerTestHarness {
   @Test
   def testReachableServer() {
     val server = servers.head
-    val props = TestUtils.getSyncProducerConfig(server.socketServer.boundPort)
+
+    val props = 
TestUtils.getSyncProducerConfig(server.socketServer.boundPort())
+
 
     val producer = new SyncProducer(new SyncProducerConfig(props))
     val firstStart = SystemTime.milliseconds
@@ -73,7 +77,8 @@ class SyncProducerTest extends JUnit3Suite with 
KafkaServerTestHarness {
   @Test
   def testEmptyProduceRequest() {
     val server = servers.head
-    val props = TestUtils.getSyncProducerConfig(server.socketServer.boundPort)
+    val props = 
TestUtils.getSyncProducerConfig(server.socketServer.boundPort())
+
 
     val correlationId = 0
     val clientId = SyncProducerConfig.DefaultClientId
@@ -90,7 +95,7 @@ class SyncProducerTest extends JUnit3Suite with 
KafkaServerTestHarness {
   @Test
   def testMessageSizeTooLarge() {
     val server = servers.head
-    val props = TestUtils.getSyncProducerConfig(server.socketServer.boundPort)
+    val props = 
TestUtils.getSyncProducerConfig(server.socketServer.boundPort())
 
     val producer = new SyncProducer(new SyncProducerConfig(props))
     TestUtils.createTopic(zkClient, "test", numPartitions = 1, 
replicationFactor = 1, servers = servers)
@@ -117,8 +122,8 @@ class SyncProducerTest extends JUnit3Suite with 
KafkaServerTestHarness {
   @Test
   def testMessageSizeTooLargeWithAckZero() {
     val server = servers.head
+    val props = 
TestUtils.getSyncProducerConfig(server.socketServer.boundPort())
 
-    val props = TestUtils.getSyncProducerConfig(server.socketServer.boundPort)
     props.put("request.required.acks", "0")
 
     val producer = new SyncProducer(new SyncProducerConfig(props))
@@ -144,7 +149,7 @@ class SyncProducerTest extends JUnit3Suite with 
KafkaServerTestHarness {
   @Test
   def testProduceCorrectlyReceivesResponse() {
     val server = servers.head
-    val props = TestUtils.getSyncProducerConfig(server.socketServer.boundPort)
+    val props = 
TestUtils.getSyncProducerConfig(server.socketServer.boundPort())
 
     val producer = new SyncProducer(new SyncProducerConfig(props))
     val messages = new ByteBufferMessageSet(NoCompressionCodec, new 
Message(messageBytes))
@@ -190,7 +195,7 @@ class SyncProducerTest extends JUnit3Suite with 
KafkaServerTestHarness {
     val timeoutMs = 500
 
     val server = servers.head
-    val props = TestUtils.getSyncProducerConfig(server.socketServer.boundPort)
+    val props = 
TestUtils.getSyncProducerConfig(server.socketServer.boundPort())
     val producer = new SyncProducer(new SyncProducerConfig(props))
 
     val messages = new ByteBufferMessageSet(NoCompressionCodec, new 
Message(messageBytes))
@@ -216,7 +221,9 @@ class SyncProducerTest extends JUnit3Suite with 
KafkaServerTestHarness {
   @Test
   def testProduceRequestWithNoResponse() {
     val server = servers.head
-    val props = TestUtils.getSyncProducerConfig(server.socketServer.boundPort)
+
+    val port = server.socketServer.boundPort(SecurityProtocol.PLAINTEXT)
+    val props = TestUtils.getSyncProducerConfig(port)
     val correlationId = 0
     val clientId = SyncProducerConfig.DefaultClientId
     val ackTimeoutMs = SyncProducerConfig.DefaultAckTimeoutMs
@@ -231,8 +238,8 @@ class SyncProducerTest extends JUnit3Suite with 
KafkaServerTestHarness {
   def testNotEnoughReplicas()  {
     val topicName = "minisrtest"
     val server = servers.head
+    val props = 
TestUtils.getSyncProducerConfig(server.socketServer.boundPort())
 
-    val props = TestUtils.getSyncProducerConfig(server.socketServer.boundPort)
     props.put("request.required.acks", "-1")
 
     val producer = new SyncProducer(new SyncProducerConfig(props))

http://git-wip-us.apache.org/repos/asf/kafka/blob/53f31432/core/src/test/scala/unit/kafka/server/AdvertiseBrokerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/AdvertiseBrokerTest.scala 
b/core/src/test/scala/unit/kafka/server/AdvertiseBrokerTest.scala
index b011240..48d3143 100644
--- a/core/src/test/scala/unit/kafka/server/AdvertiseBrokerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/AdvertiseBrokerTest.scala
@@ -17,10 +17,11 @@
 
 package kafka.server
 
-import org.scalatest.junit.JUnit3Suite
-import kafka.zk.ZooKeeperTestHarness
 import junit.framework.Assert._
-import kafka.utils.{ZkUtils, Utils, TestUtils}
+import kafka.utils.{TestUtils, Utils, ZkUtils}
+import kafka.zk.ZooKeeperTestHarness
+import org.apache.kafka.common.protocol.SecurityProtocol
+import org.scalatest.junit.JUnit3Suite
 
 class AdvertiseBrokerTest extends JUnit3Suite with ZooKeeperTestHarness {
   var server : KafkaServer = null
@@ -30,10 +31,11 @@ class AdvertiseBrokerTest extends JUnit3Suite with 
ZooKeeperTestHarness {
 
   override def setUp() {
     super.setUp()
+
     val props = TestUtils.createBrokerConfig(brokerId, zkConnect)
     props.put("advertised.host.name", advertisedHostName)
     props.put("advertised.port", advertisedPort.toString)
-    
+
     server = TestUtils.createServer(KafkaConfig.fromProps(props))
   }
 
@@ -45,8 +47,9 @@ class AdvertiseBrokerTest extends JUnit3Suite with 
ZooKeeperTestHarness {
   
   def testBrokerAdvertiseToZK {
     val brokerInfo = ZkUtils.getBrokerInfo(zkClient, brokerId)
-    assertEquals(advertisedHostName, brokerInfo.get.host)
-    assertEquals(advertisedPort, brokerInfo.get.port)
+    val endpoint = brokerInfo.get.endPoints.get(SecurityProtocol.PLAINTEXT).get
+    assertEquals(advertisedHostName, endpoint.host)
+    assertEquals(advertisedPort, endpoint.port)
   }
   
 }
\ No newline at end of file

Reply via email to