This is an automated email from the ASF dual-hosted git repository.

ijuma pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 27c2c81  KAFKA-6320: Move ZK metrics in KafkaHealthCheck to 
ZookeeperClient (#4351)
27c2c81 is described below

commit 27c2c81663d6518a3da5cfde88109f90ded8a6db
Author: Jun Rao <[email protected]>
AuthorDate: Fri Dec 22 02:28:57 2017 -0800

    KAFKA-6320: Move ZK metrics in KafkaHealthCheck to ZookeeperClient (#4351)
    
    * Moved metrics in KafkaHealthCheck to ZookeeperClient.
    * Converted remaining ZkUtils usage in KafkaServer to ZookeeperClient and 
removed ZkUtils from KafkaServer.
    * Made the re-creation of ZooKeeper during ZK session expiration with 
infinite retries.
    * Added unit tests for all new methods in KafkaZkClient.
    
    Reviewers: Manikumar Reddy <[email protected]>, Ismael Juma 
<[email protected]>
---
 .../scala/kafka/controller/KafkaController.scala   |   8 +-
 .../transaction/ProducerIdManager.scala            |   8 +-
 .../kafka/security/auth/SimpleAclAuthorizer.scala  |   3 +-
 .../main/scala/kafka/server/KafkaHealthcheck.scala | 113 ---------------------
 core/src/main/scala/kafka/server/KafkaServer.scala |  67 +++++-------
 core/src/main/scala/kafka/zk/KafkaZkClient.scala   |  82 ++++++++++++++-
 core/src/main/scala/kafka/zk/ZkData.scala          |  41 ++++++++
 .../scala/kafka/zookeeper/ZooKeeperClient.scala    |  78 ++++++++++----
 .../scala/integration/kafka/api/MetricsTest.scala  |  26 +++--
 .../test/scala/unit/kafka/admin/AdminTest.scala    |  22 ++--
 .../unit/kafka/server/ServerShutdownTest.scala     |   4 +-
 .../unit/kafka/server/ServerStartupTest.scala      |   2 -
 .../kafka/server/SessionExpireListenerTest.scala   |  73 -------------
 .../scala/unit/kafka/zk/KafkaZkClientTest.scala    |  46 ++++++++-
 .../unit/kafka/zookeeper/ZooKeeperClientTest.scala |  47 ++++++++-
 .../integration/InternalTopicIntegrationTest.java  |   3 +-
 .../streams/integration/utils/KafkaEmbedded.java   |   8 +-
 17 files changed, 340 insertions(+), 291 deletions(-)

diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala 
b/core/src/main/scala/kafka/controller/KafkaController.scala
index f272851..ca8422e 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -1416,13 +1416,7 @@ class KafkaController(val config: KafkaConfig, zkClient: 
KafkaZkClient, time: Ti
 
     override def process(): Unit = {
       zkClient.registerBrokerInZk(brokerInfo)
-      val wasActiveBeforeChange = isActive
-      
zkClient.registerZNodeChangeHandlerAndCheckExistence(controllerChangeHandler)
-      activeControllerId = zkClient.getControllerId.getOrElse(-1)
-      if (wasActiveBeforeChange && !isActive) {
-        onControllerResignation()
-      }
-      elect()
+      Reelect.process()
     }
   }
 
diff --git 
a/core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala 
b/core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala
index 716e3d1..9c815bc 100644
--- a/core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala
+++ b/core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala
@@ -20,7 +20,7 @@ import java.nio.charset.StandardCharsets
 
 import kafka.common.KafkaException
 import kafka.utils.{Json, Logging, ZkUtils}
-import kafka.zk.KafkaZkClient
+import kafka.zk.{KafkaZkClient, ProducerIdBlockZNode}
 
 import scala.collection.JavaConverters._
 
@@ -87,7 +87,7 @@ class ProducerIdManager(val brokerId: Int, val zkClient: 
KafkaZkClient) extends
     var zkWriteComplete = false
     while (!zkWriteComplete) {
       // refresh current producerId block from zookeeper again
-      val (dataOpt, zkVersion) = 
zkClient.getDataAndVersion(ZkUtils.ProducerIdBlockPath)
+      val (dataOpt, zkVersion) = 
zkClient.getDataAndVersion(ProducerIdBlockZNode.path)
 
       // generate the new producerId block
       currentProducerIdBlock = dataOpt match {
@@ -110,7 +110,7 @@ class ProducerIdManager(val brokerId: Int, val zkClient: 
KafkaZkClient) extends
       val newProducerIdBlockData = 
ProducerIdManager.generateProducerIdBlockJson(currentProducerIdBlock)
 
       // try to write the new producerId block into zookeeper
-      val (succeeded, version) = 
zkClient.conditionalUpdatePath(ZkUtils.ProducerIdBlockPath,
+      val (succeeded, version) = 
zkClient.conditionalUpdatePath(ProducerIdBlockZNode.path,
         newProducerIdBlockData, zkVersion, Some(checkProducerIdBlockZkData))
       zkWriteComplete = succeeded
 
@@ -122,7 +122,7 @@ class ProducerIdManager(val brokerId: Int, val zkClient: 
KafkaZkClient) extends
   private def checkProducerIdBlockZkData(zkClient: KafkaZkClient, path: 
String, expectedData: Array[Byte]): (Boolean, Int) = {
     try {
       val expectedPidBlock = 
ProducerIdManager.parseProducerIdBlockData(expectedData)
-      zkClient.getDataAndVersion(ZkUtils.ProducerIdBlockPath) match {
+      zkClient.getDataAndVersion(ProducerIdBlockZNode.path) match {
         case (Some(data), zkVersion) =>
           val currProducerIdBLock = 
ProducerIdManager.parseProducerIdBlockData(data)
           (currProducerIdBLock == expectedPidBlock, zkVersion)
diff --git a/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala 
b/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
index 80d85a0..74bc809 100644
--- a/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
+++ b/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
@@ -92,7 +92,8 @@ class SimpleAclAuthorizer extends Authorizer with Logging {
     val zkMaxInFlightRequests = 
configs.get(SimpleAclAuthorizer.ZkMaxInFlightRequests).map(_.toString.toInt).getOrElse(kafkaConfig.zkMaxInFlightRequests)
 
     val time = Time.SYSTEM
-    val zooKeeperClient = new ZooKeeperClient(zkUrl, zkSessionTimeOutMs, 
zkConnectionTimeoutMs, zkMaxInFlightRequests, time)
+    val zooKeeperClient = new ZooKeeperClient(zkUrl, zkSessionTimeOutMs, 
zkConnectionTimeoutMs, zkMaxInFlightRequests,
+      time, "kafka.security", "SimpleAclAuthorizer")
     zkClient = new KafkaZkClient(zooKeeperClient, 
kafkaConfig.zkEnableSecureAcls, time)
     zkClient.createAclPaths()
 
diff --git a/core/src/main/scala/kafka/server/KafkaHealthcheck.scala 
b/core/src/main/scala/kafka/server/KafkaHealthcheck.scala
deleted file mode 100644
index 2ad8168..0000000
--- a/core/src/main/scala/kafka/server/KafkaHealthcheck.scala
+++ /dev/null
@@ -1,113 +0,0 @@
-/**
-  * Licensed to the Apache Software Foundation (ASF) under one or more
-  * contributor license agreements.  See the NOTICE file distributed with
-  * this work for additional information regarding copyright ownership.
-  * The ASF licenses this file to You under the Apache License, Version 2.0
-  * (the "License"); you may not use this file except in compliance with
-  * the License.  You may obtain a copy of the License at
-  *
-  *    http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing, software
-  * distributed under the License is distributed on an "AS IS" BASIS,
-  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  * See the License for the specific language governing permissions and
-  * limitations under the License.
-  */
-
-package kafka.server
-
-import java.net.InetAddress
-import java.util.Locale
-import java.util.concurrent.TimeUnit
-
-import kafka.api.ApiVersion
-import kafka.cluster.EndPoint
-import kafka.metrics.KafkaMetricsGroup
-import kafka.utils._
-import com.yammer.metrics.core.Gauge
-import org.I0Itec.zkclient.IZkStateListener
-import org.apache.kafka.common.security.auth.SecurityProtocol
-import org.apache.zookeeper.Watcher.Event.KeeperState
-
-import scala.collection.mutable.Set
-
-/**
-  * This class registers the broker in zookeeper to allow
-  * other brokers and consumers to detect failures. It uses an ephemeral znode 
with the path:
-  *   /brokers/ids/[0...N] --> advertisedHost:advertisedPort
-  *
-  * Right now our definition of health is fairly naive. If we register in zk 
we are healthy, otherwise
-  * we are dead.
-  */
-class KafkaHealthcheck(brokerId: Int,
-                       advertisedEndpoints: Seq[EndPoint],
-                       zkUtils: ZkUtils,
-                       rack: Option[String],
-                       interBrokerProtocolVersion: ApiVersion) extends Logging 
{
-
-  private[server] val sessionExpireListener = new SessionExpireListener
-
-  def startup() {
-    zkUtils.subscribeStateChanges(sessionExpireListener)
-    // registration is done in KafkaServer now
-  }
-
-  def shutdown(): Unit = sessionExpireListener.shutdown()
-
-  /**
-    *  When we get a SessionExpired event, it means that we have lost all 
ephemeral nodes and ZKClient has re-established
-    *  a connection for us. We need to re-register this broker in the broker 
registry. We rely on `handleStateChanged`
-    *  to record ZooKeeper connection state metrics.
-    */
-  class SessionExpireListener extends IZkStateListener with KafkaMetricsGroup {
-
-    private val metricNames = Set[String]()
-
-    private[server] val stateToMeterMap = {
-      import KeeperState._
-      val stateToEventTypeMap = Map(
-        Disconnected -> "Disconnects",
-        SyncConnected -> "SyncConnects",
-        AuthFailed -> "AuthFailures",
-        ConnectedReadOnly -> "ReadOnlyConnects",
-        SaslAuthenticated -> "SaslAuthentications",
-        Expired -> "Expires"
-      )
-      stateToEventTypeMap.map { case (state, eventType) =>
-        val name = s"ZooKeeper${eventType}PerSec"
-        metricNames += name
-        state -> newMeter(name, eventType.toLowerCase(Locale.ROOT), 
TimeUnit.SECONDS)
-      }
-    }
-
-    private[server] val sessionStateGauge =
-      newGauge("SessionState", new Gauge[String] {
-        override def value: String =
-          
Option(zkUtils.zkConnection.getZookeeperState.toString).getOrElse("DISCONNECTED")
-      })
-
-    metricNames += "SessionState"
-
-    @throws[Exception]
-    override def handleStateChanged(state: KeeperState) {
-      stateToMeterMap.get(state).foreach(_.mark())
-    }
-
-    @throws[Exception]
-    override def handleNewSession() {
-      //info("re-registering broker info in ZK for broker " + brokerId)
-      //register()
-      //info("done re-registering broker")
-      //info("Subscribing to %s path to watch for new 
topics".format(ZkUtils.BrokerTopicsPath))
-    }
-
-    override def handleSessionEstablishmentError(err: Throwable) {
-      error("Could not establish session with zookeeper", err)
-    }
-
-    def shutdown(): Unit = metricNames.foreach(removeMetric(_))
-
-  }
-
-}
\ No newline at end of file
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala 
b/core/src/main/scala/kafka/server/KafkaServer.scala
index 5bd3f8e..8643233 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -26,7 +26,7 @@ import java.util.concurrent.atomic.{AtomicBoolean, 
AtomicInteger}
 import com.yammer.metrics.core.Gauge
 import kafka.api.KAFKA_0_9_0
 import kafka.cluster.{Broker, EndPoint}
-import kafka.common.{GenerateBrokerIdException, InconsistentBrokerIdException}
+import kafka.common.{GenerateBrokerIdException, InconsistentBrokerIdException, 
KafkaException}
 import kafka.controller.KafkaController
 import kafka.coordinator.group.GroupCoordinator
 import kafka.coordinator.transaction.TransactionCoordinator
@@ -133,11 +133,9 @@ class KafkaServer(val config: KafkaConfig, time: Time = 
Time.SYSTEM, threadNameP
 
   val kafkaScheduler = new KafkaScheduler(config.backgroundThreads)
 
-  var kafkaHealthcheck: KafkaHealthcheck = null
   var metadataCache: MetadataCache = null
   var quotaManagers: QuotaFactory.QuotaManagers = null
 
-  var zkUtils: ZkUtils = null
   private var _zkClient: KafkaZkClient = null
   val correlationId: AtomicInteger = new AtomicInteger(0)
   val brokerMetaPropsFile = "meta.properties"
@@ -199,10 +197,10 @@ class KafkaServer(val config: KafkaConfig, time: Time = 
Time.SYSTEM, threadNameP
         kafkaScheduler.startup()
 
         /* setup zookeeper */
-        zkUtils = initZk()
+        initZkClient(time)
 
         /* Get or create cluster_id */
-        _clusterId = getOrGenerateClusterId(zkUtils)
+        _clusterId = getOrGenerateClusterId(zkClient)
         info(s"Cluster ID = $clusterId")
 
         /* generate brokerId */
@@ -226,10 +224,6 @@ class KafkaServer(val config: KafkaConfig, time: Time = 
Time.SYSTEM, threadNameP
 
         logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size)
 
-        val zooKeeperClient = new ZooKeeperClient(config.zkConnect, 
config.zkSessionTimeoutMs,
-          config.zkConnectionTimeoutMs, config.zkMaxInFlightRequests, time)
-        _zkClient = new KafkaZkClient(zooKeeperClient, zkUtils.isSecure, time)
-
         /* start log manager */
         logManager = LogManager(config, initialOfflineDirs, zkClient, 
brokerState, kafkaScheduler, time, brokerTopicStats, logDirFailureChannel)
         logManager.startup()
@@ -252,12 +246,6 @@ class KafkaServer(val config: KafkaConfig, time: Time = 
Time.SYSTEM, threadNameP
             endpoint
         }
 
-        // to be cleaned up in KAFKA-6320
-        kafkaHealthcheck = new KafkaHealthcheck(config.brokerId, listeners, 
zkUtils, config.rack,
-          config.interBrokerProtocolVersion)
-        kafkaHealthcheck.startup()
-        // KAFKA-6320
-
         val updatedEndpoints = listeners.map(endpoint =>
           if (endpoint.host == null || endpoint.host.trim.isEmpty)
             endpoint.copy(host = InetAddress.getLocalHost.getCanonicalHostName)
@@ -277,7 +265,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = 
Time.SYSTEM, threadNameP
           updatedEndpoints, jmxPort, config.rack, 
config.interBrokerProtocolVersion)
         zkClient.registerBrokerInZk(brokerInfo)
 
-        // Now that the broker id is successfully registered via 
KafkaHealthcheck, checkpoint it
+        // Now that the broker id is successfully registered, checkpoint it
         checkpointBrokerId(config.brokerId)
 
         /* start kafka controller */
@@ -351,7 +339,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = 
Time.SYSTEM, threadNameP
     new ReplicaManager(config, metrics, time, zkClient, kafkaScheduler, 
logManager, isShuttingDown, quotaManagers,
       brokerTopicStats, metadataCache, logDirFailureChannel)
 
-  private def initZk(): ZkUtils = {
+  private def initZkClient(time: Time): Unit = {
     info(s"Connecting to zookeeper on ${config.zkConnect}")
 
     val chrootIndex = config.zkConnect.indexOf("/")
@@ -366,29 +354,25 @@ class KafkaServer(val config: KafkaConfig, time: Time = 
Time.SYSTEM, threadNameP
     if (secureAclsEnabled && !isZkSecurityEnabled)
       throw new 
java.lang.SecurityException(s"${KafkaConfig.ZkEnableSecureAclsProp} is true, 
but the verification of the JAAS login file failed.")
 
+    // make sure chroot path exists
     chrootOption.foreach { chroot =>
       val zkConnForChrootCreation = config.zkConnect.substring(0, chrootIndex)
-      val zkClientForChrootCreation = 
ZkUtils.withMetrics(zkConnForChrootCreation,
-                                              sessionTimeout = 
config.zkSessionTimeoutMs,
-                                              connectionTimeout = 
config.zkConnectionTimeoutMs,
-                                              secureAclsEnabled,
-                                              time)
-      zkClientForChrootCreation.makeSurePersistentPathExists(chroot)
+      val zooKeeperClient = new ZooKeeperClient(zkConnForChrootCreation, 
config.zkSessionTimeoutMs,
+        config.zkConnectionTimeoutMs, config.zkMaxInFlightRequests, time)
+      val zkClient = new KafkaZkClient(zooKeeperClient, secureAclsEnabled, 
time)
+      zkClient.makeSurePersistentPathExists(chroot)
       info(s"Created zookeeper path $chroot")
-      zkClientForChrootCreation.close()
+      zkClient.close()
     }
 
-    val zkUtils = ZkUtils.withMetrics(config.zkConnect,
-                          sessionTimeout = config.zkSessionTimeoutMs,
-                          connectionTimeout = config.zkConnectionTimeoutMs,
-                          secureAclsEnabled,
-                          time)
-    zkUtils.setupCommonPaths()
-    zkUtils
+    val zooKeeperClient = new ZooKeeperClient(config.zkConnect, 
config.zkSessionTimeoutMs,
+      config.zkConnectionTimeoutMs, config.zkMaxInFlightRequests, time)
+    _zkClient = new KafkaZkClient(zooKeeperClient, secureAclsEnabled, time)
+    _zkClient.createTopLevelPaths()
   }
 
-  def getOrGenerateClusterId(zkUtils: ZkUtils): String = {
-    
zkUtils.getClusterId.getOrElse(zkUtils.createOrGetClusterId(CoreUtils.generateUuidAsBase64))
+  def getOrGenerateClusterId(zkClient: KafkaZkClient): String = {
+    
zkClient.getClusterId.getOrElse(zkClient.createOrGetClusterId(CoreUtils.generateUuidAsBase64))
   }
 
   /**
@@ -455,9 +439,9 @@ class KafkaServer(val config: KafkaConfig, time: Time = 
Time.SYSTEM, threadNameP
 
           // Get the current controller info. This is to ensure we use the 
most recent info to issue the
           // controlled shutdown request
-          val controllerId = zkUtils.getController()
+          val controllerId = zkClient.getControllerId.getOrElse(throw new 
KafkaException("Controller doesn't exist"))
           //If this method returns None ignore and try again
-          zkUtils.getBrokerInfo(controllerId).foreach { broker =>
+          zkClient.getBroker(controllerId).foreach { broker =>
             // if this is the first attempt, if the controller has changed or 
if an exception was thrown in a previous
             // attempt, connect to the most recent controller
             if (ioException || broker != prevController) {
@@ -549,9 +533,6 @@ class KafkaServer(val config: KafkaConfig, time: Time = 
Time.SYSTEM, threadNameP
         CoreUtils.swallow(controlledShutdown(), this)
         brokerState.newState(BrokerShuttingDown)
 
-        if (kafkaHealthcheck != null)
-          CoreUtils.swallow(kafkaHealthcheck.shutdown(), this)
-
         if (dynamicConfigManager != null)
           CoreUtils.swallow(dynamicConfigManager.shutdown(), this)
 
@@ -582,8 +563,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = 
Time.SYSTEM, threadNameP
 
         if (kafkaController != null)
           CoreUtils.swallow(kafkaController.shutdown(), this)
-        if (zkUtils != null)
-          CoreUtils.swallow(zkUtils.close(), this)
+
         if (zkClient != null)
           CoreUtils.swallow(zkClient.close(), this)
 
@@ -689,9 +669,14 @@ class KafkaServer(val config: KafkaConfig, time: Time = 
Time.SYSTEM, threadNameP
     }
   }
 
+  /**
+    * Return a sequence id generated by updating the broker sequence id path 
in ZK.
+    * Users can provide brokerId in the config. To avoid conflicts between ZK 
generated
+    * sequence id and configured brokerId, we increment the generated sequence 
id by KafkaConfig.MaxReservedBrokerId.
+    */
   private def generateBrokerId: Int = {
     try {
-      zkUtils.getBrokerSequenceId(config.maxReservedBrokerId)
+      zkClient.generateBrokerSequenceId() + config.maxReservedBrokerId
     } catch {
       case e: Exception =>
         error("Failed to generate broker.id due to ", e)
diff --git a/core/src/main/scala/kafka/zk/KafkaZkClient.scala 
b/core/src/main/scala/kafka/zk/KafkaZkClient.scala
index 13fd024..8c3f018 100644
--- a/core/src/main/scala/kafka/zk/KafkaZkClient.scala
+++ b/core/src/main/scala/kafka/zk/KafkaZkClient.scala
@@ -18,10 +18,10 @@ package kafka.zk
 
 import java.util.Properties
 
-
 import com.yammer.metrics.core.MetricName
 import kafka.api.LeaderAndIsr
 import kafka.cluster.Broker
+import kafka.common.KafkaException
 import kafka.controller.LeaderIsrAndControllerEpoch
 import kafka.log.LogConfig
 import kafka.metrics.KafkaMetricsGroup
@@ -32,7 +32,7 @@ import kafka.utils._
 import kafka.zookeeper._
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.utils.Time
-import org.apache.zookeeper.KeeperException.Code
+import org.apache.zookeeper.KeeperException.{Code, NodeExistsException}
 import org.apache.zookeeper.data.{ACL, Stat}
 import org.apache.zookeeper.{CreateMode, KeeperException}
 
@@ -299,6 +299,20 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, 
isSecure: Boolean, time: T
     }
   }
 
+  /**
+    * Get a broker from ZK
+    * @return an optional Broker
+    */
+  def getBroker(brokerId: Int): Option[Broker] = {
+    val getDataRequest = GetDataRequest(BrokerIdZNode.path(brokerId))
+    val getDataResponse = retryRequestUntilConnected(getDataRequest)
+    getDataResponse.resultCode match {
+      case Code.OK =>
+        Option(BrokerIdZNode.decode(brokerId, getDataResponse.data))
+      case Code.NONODE => None
+      case _ => throw getDataResponse.resultException.get
+    }
+  }
 
   /**
    * Gets the list of sorted broker Ids
@@ -1174,6 +1188,67 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, 
isSecure: Boolean, time: T
     }
   }
 
+  /**
+    * Get the cluster id.
+    * @return optional cluster id in String.
+    */
+  def getClusterId: Option[String] = {
+    val getDataRequest = GetDataRequest(ClusterIdZNode.path)
+    val getDataResponse = retryRequestUntilConnected(getDataRequest)
+    getDataResponse.resultCode match {
+      case Code.OK => Some(ClusterIdZNode.fromJson(getDataResponse.data))
+      case Code.NONODE => None
+      case _ => throw getDataResponse.resultException.get
+    }
+  }
+
+  /**
+    * Create the cluster Id. If the cluster id already exists, return the 
current cluster id.
+    * @return  cluster id
+    */
+  def createOrGetClusterId(proposedClusterId: String): String = {
+    try {
+      createRecursive(ClusterIdZNode.path, 
ClusterIdZNode.toJson(proposedClusterId))
+      proposedClusterId
+    } catch {
+      case e: NodeExistsException => getClusterId.getOrElse(
+        throw new KafkaException("Failed to get cluster id from Zookeeper. 
This can happen if /cluster/id is deleted from Zookeeper."))
+    }
+  }
+
+  /**
+    * Generate a borker id by updating the broker sequence id path in ZK and 
return the version of the path.
+    * The version is incremented by one on every update starting from 1.
+    * @return sequence number as the broker id
+    */
+  def generateBrokerSequenceId(): Int = {
+    val setDataRequest = SetDataRequest(BrokerSequenceIdZNode.path, 
Array.empty[Byte], -1)
+    val setDataResponse = retryRequestUntilConnected(setDataRequest)
+    setDataResponse.resultCode match {
+      case Code.OK => setDataResponse.stat.getVersion
+      case Code.NONODE =>
+        // maker sure the path exists
+        createRecursive(BrokerSequenceIdZNode.path, Array.empty[Byte], 
throwIfPathExists = false)
+        generateBrokerSequenceId()
+      case _ => throw setDataResponse.resultException.get
+    }
+  }
+
+  /**
+    * Pre-create top level paths in ZK if needed.
+    */
+  def createTopLevelPaths(): Unit = {
+    ZkData.PersistentZkPaths.foreach(makeSurePersistentPathExists(_))
+  }
+
+  /**
+    * Make sure a persistent path exists in ZK.
+    * @param path
+    */
+  def makeSurePersistentPathExists(path: String): Unit = {
+    createRecursive(path, data = null, throwIfPathExists = false)
+  }
+
   private def setConsumerOffset(group: String, topicPartition: TopicPartition, 
offset: Long): SetDataResponse = {
     val setDataRequest = SetDataRequest(ConsumerOffset.path(group, 
topicPartition.topic, topicPartition.partition),
       ConsumerOffset.encode(offset), ZkVersion.NoVersion)
@@ -1246,7 +1321,8 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, 
isSecure: Boolean, time: T
     } else if (createResponse.resultCode == Code.NONODE) {
       createRecursive0(parentPath(path))
       createResponse = retryRequestUntilConnected(createRequest)
-      createResponse.maybeThrow
+      if (throwIfPathExists || createResponse.resultCode != Code.NODEEXISTS)
+        createResponse.maybeThrow
     } else if (createResponse.resultCode != Code.NODEEXISTS)
       createResponse.maybeThrow
 
diff --git a/core/src/main/scala/kafka/zk/ZkData.scala 
b/core/src/main/scala/kafka/zk/ZkData.scala
index a03263c..9578129 100644
--- a/core/src/main/scala/kafka/zk/ZkData.scala
+++ b/core/src/main/scala/kafka/zk/ZkData.scala
@@ -21,12 +21,15 @@ import java.util.Properties
 
 import kafka.api.{ApiVersion, KAFKA_0_10_0_IV1, LeaderAndIsr}
 import kafka.cluster.{Broker, EndPoint}
+import kafka.common.KafkaException
 import kafka.controller.{IsrChangeNotificationHandler, 
LeaderIsrAndControllerEpoch}
 import kafka.security.auth.{Acl, Resource}
 import kafka.security.auth.SimpleAclAuthorizer.VersionedAcls
+import kafka.server.ConfigType
 import kafka.utils.Json
 import org.apache.kafka.common.TopicPartition
 import org.apache.zookeeper.data.Stat
+
 import scala.collection.JavaConverters._
 
 // This file contains objects for encoding/decoding data stored in ZooKeeper 
nodes (znodes).
@@ -338,3 +341,41 @@ object AclChangeNotificationSequenceZNode {
   def encode(resourceName : String): Array[Byte] = resourceName.getBytes(UTF_8)
   def decode(bytes: Array[Byte]): String = new String(bytes, UTF_8)
 }
+
+object ClusterIdZNode {
+  def path = "/cluster/id"
+
+  def toJson(id: String): Array[Byte] = {
+    Json.encodeAsBytes(Map("version" -> "1", "id" -> id).asJava)
+  }
+
+  def fromJson(clusterIdJson:  Array[Byte]): String = {
+    
Json.parseBytes(clusterIdJson).map(_.asJsonObject("id").to[String]).getOrElse {
+      throw new KafkaException(s"Failed to parse the cluster id json 
$clusterIdJson")
+    }
+  }
+}
+
+object BrokerSequenceIdZNode {
+  def path = s"${BrokersZNode.path}/seqid"
+}
+
+object ProducerIdBlockZNode {
+  def path = "/latest_producer_id_block"
+}
+
+object ZkData {
+  // These are persistent ZK paths that should exist on kafka broker startup.
+  val PersistentZkPaths = Seq(
+    "/consumers",  // old consumer path
+    BrokerIdsZNode.path,
+    TopicsZNode.path,
+    ConfigEntityChangeNotificationZNode.path,
+    ConfigEntityTypeZNode.path(ConfigType.Topic),
+    ConfigEntityTypeZNode.path(ConfigType.Client),
+    DeleteTopicsZNode.path,
+    BrokerSequenceIdZNode.path,
+    IsrChangeNotificationZNode.path,
+    ProducerIdBlockZNode.path,
+    LogDirEventNotificationZNode.path)
+}
\ No newline at end of file
diff --git a/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala 
b/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
index a0898da..6d786dc 100644
--- a/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
+++ b/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
@@ -17,9 +17,12 @@
 
 package kafka.zookeeper
 
+import java.util.Locale
 import java.util.concurrent.locks.{ReentrantLock, ReentrantReadWriteLock}
 import java.util.concurrent.{ArrayBlockingQueue, ConcurrentHashMap, 
CountDownLatch, Semaphore, TimeUnit}
 
+import com.yammer.metrics.core.{Gauge, MetricName}
+import kafka.metrics.KafkaMetricsGroup
 import kafka.utils.CoreUtils.{inLock, inReadLock, inWriteLock}
 import kafka.utils.Logging
 import org.apache.kafka.common.utils.Time
@@ -31,6 +34,7 @@ import org.apache.zookeeper.data.{ACL, Stat}
 import org.apache.zookeeper.{CreateMode, KeeperException, WatchedEvent, 
Watcher, ZooKeeper}
 
 import scala.collection.JavaConverters._
+import scala.collection.mutable.Set
 
 /**
  * A ZooKeeper client that encourages pipelined requests.
@@ -44,7 +48,9 @@ class ZooKeeperClient(connectString: String,
                       sessionTimeoutMs: Int,
                       connectionTimeoutMs: Int,
                       maxInFlightRequests: Int,
-                      time: Time) extends Logging {
+                      time: Time,
+                      metricGroup: String = "kafka.server",
+                      metricType: String = "KafkaHealthcheck") extends Logging 
with KafkaMetricsGroup {
   this.logIdent = "[ZooKeeperClient] "
   private val initializationLock = new ReentrantReadWriteLock()
   private val isConnectedOrExpiredLock = new ReentrantLock()
@@ -54,10 +60,47 @@ class ZooKeeperClient(connectString: String,
   private val inFlightRequests = new Semaphore(maxInFlightRequests)
   private val stateChangeHandlers = new ConcurrentHashMap[String, 
StateChangeHandler]().asScala
 
+  private val metricNames = Set[String]()
+
+  // The state map has to be created before creating ZooKeeper since it's 
needed in the ZooKeeper callback.
+  private val stateToMeterMap = {
+    import KeeperState._
+    val stateToEventTypeMap = Map(
+      Disconnected -> "Disconnects",
+      SyncConnected -> "SyncConnects",
+      AuthFailed -> "AuthFailures",
+      ConnectedReadOnly -> "ReadOnlyConnects",
+      SaslAuthenticated -> "SaslAuthentications",
+      Expired -> "Expires"
+    )
+    stateToEventTypeMap.map { case (state, eventType) =>
+      val name = s"ZooKeeper${eventType}PerSec"
+      metricNames += name
+      state -> newMeter(name, eventType.toLowerCase(Locale.ROOT), 
TimeUnit.SECONDS)
+    }
+  }
+
   info(s"Initializing a new session to $connectString.")
   @volatile private var zooKeeper = new ZooKeeper(connectString, 
sessionTimeoutMs, ZooKeeperClientWatcher)
+
+  private val sessionStateGauge =
+    newGauge("SessionState", new Gauge[String] {
+      override def value: String =
+        Option(zooKeeper.getState.toString).getOrElse("DISCONNECTED")
+    })
+
+  metricNames += "SessionState"
+
   waitUntilConnected(connectionTimeoutMs, TimeUnit.MILLISECONDS)
 
+
+  /**
+    * This is added to preserve the original metric name in JMX
+    */
+  override def metricName(name: String, metricTags: 
scala.collection.Map[String, String]): MetricName = {
+    explicitMetricName(metricGroup, metricType, name, metricTags)
+  }
+
   /**
    * Send a request and wait for its response. See handle(Seq[AsyncRequest]) 
for details.
    *
@@ -259,6 +302,7 @@ class ZooKeeperClient(connectString: String,
     zNodeChildChangeHandlers.clear()
     stateChangeHandlers.clear()
     zooKeeper.close()
+    metricNames.foreach(removeMetric(_))
     info("Closed.")
   }
 
@@ -269,25 +313,20 @@ class ZooKeeperClient(connectString: String,
   private def initialize(): Unit = {
     if (!zooKeeper.getState.isAlive) {
       info(s"Initializing a new session to $connectString.")
-      var now = System.currentTimeMillis()
-      val threshold = now + connectionTimeoutMs
-      while (now < threshold) {
+      // retry forever until ZooKeeper can be instantiated
+      while (true) {
         try {
           zooKeeper.close()
           zooKeeper = new ZooKeeper(connectString, sessionTimeoutMs, 
ZooKeeperClientWatcher)
-          waitUntilConnected(threshold - now, TimeUnit.MILLISECONDS)
           return
         } catch {
-          case _: Exception =>
-            now = System.currentTimeMillis()
-            if (now < threshold) {
-              Thread.sleep(1000)
-              now = System.currentTimeMillis()
-            }
+          case e: Exception =>
+            info("Error when recreating ZooKeeper", e)
+            Thread.sleep(1000)
         }
       }
       info(s"Timed out waiting for connection during session initialization 
while in state: ${zooKeeper.getState}")
-      stateChangeHandlers.foreach {case (name, handler) => 
handler.onReconnectionTimeout()}
+      stateChangeHandlers.values.foreach(_.onReconnectionTimeout())
     }
   }
 
@@ -299,23 +338,26 @@ class ZooKeeperClient(connectString: String,
     initialize()
   }
 
-  private object ZooKeeperClientWatcher extends Watcher {
+  // package level visibility for testing only
+  private[zookeeper] object ZooKeeperClientWatcher extends Watcher {
     override def process(event: WatchedEvent): Unit = {
       debug("Received event: " + event)
       Option(event.getPath) match {
         case None =>
+          val state = event.getState
+          stateToMeterMap.get(state).foreach(_.mark())
           inLock(isConnectedOrExpiredLock) {
             isConnectedOrExpiredCondition.signalAll()
           }
-          if (event.getState == KeeperState.AuthFailed) {
+          if (state == KeeperState.AuthFailed) {
             error("Auth failed.")
-            stateChangeHandlers.foreach {case (name, handler) => 
handler.onAuthFailure()}
-          } else if (event.getState == KeeperState.Expired) {
+            stateChangeHandlers.values.foreach(_.onAuthFailure())
+          } else if (state == KeeperState.Expired) {
             inWriteLock(initializationLock) {
               info("Session expired.")
-              stateChangeHandlers.foreach {case (name, handler) => 
handler.beforeInitializingSession()}
+              stateChangeHandlers.values.foreach(_.beforeInitializingSession())
               initialize()
-              stateChangeHandlers.foreach {case (name, handler) => 
handler.afterInitializingSession()}
+              stateChangeHandlers.values.foreach(_.afterInitializingSession())
             }
           }
         case Some(path) =>
diff --git a/core/src/test/scala/integration/kafka/api/MetricsTest.scala 
b/core/src/test/scala/integration/kafka/api/MetricsTest.scala
index 4596887..167b6a6 100644
--- a/core/src/test/scala/integration/kafka/api/MetricsTest.scala
+++ b/core/src/test/scala/integration/kafka/api/MetricsTest.scala
@@ -205,15 +205,13 @@ class MetricsTest extends IntegrationTestHarness with 
SaslSetup {
   }
 
   private def verifyBrokerZkMetrics(server: KafkaServer, topic: String): Unit 
= {
-    // Latency is rounded to milliseconds, so we may need to retry some 
operations to get latency > 0.
-    val (_, recorded) = TestUtils.computeUntilTrue({
-      servers.head.zkClient.getLeaderForPartition(new TopicPartition(topic, 0))
-      
yammerMetricValue("kafka.server:type=ZooKeeperClientMetrics,name=ZooKeeperRequestLatencyMs").asInstanceOf[Double]
-    })(latency => latency > 0.0)
-    assertTrue("ZooKeeper latency not recorded", recorded)
-
-    assertEquals(s"Unexpected ZK state 
${server.zkUtils.zkConnection.getZookeeperState}",
-        "CONNECTED", yammerMetricValue("SessionState"))
+    // Latency is rounded to milliseconds, so check the count instead.
+    val initialCount = 
yammerHistogramCount("kafka.server:type=ZooKeeperClientMetrics,name=ZooKeeperRequestLatencyMs").asInstanceOf[Long]
+    servers.head.zkClient.getLeaderForPartition(new TopicPartition(topic, 0))
+    val newCount = 
yammerHistogramCount("kafka.server:type=ZooKeeperClientMetrics,name=ZooKeeperRequestLatencyMs").asInstanceOf[Long]
+    assertTrue("ZooKeeper latency not recorded",  newCount > initialCount)
+
+    assertEquals(s"Unexpected ZK state", "CONNECTED", 
yammerMetricValue("SessionState"))
   }
 
   private def verifyBrokerErrorMetrics(server: KafkaServer): Unit = {
@@ -276,6 +274,16 @@ class MetricsTest extends IntegrationTestHarness with 
SaslSetup {
     }
   }
 
+  private def yammerHistogramCount(name: String): Any = {
+    val allMetrics = Metrics.defaultRegistry.allMetrics.asScala
+    val (_, metric) = allMetrics.find { case (n, _) => 
n.getMBeanName.endsWith(name) }
+      .getOrElse(fail(s"Unable to find broker metric $name: allMetrics: 
${allMetrics.keySet.map(_.getMBeanName)}"))
+    metric match {
+      case m: Histogram => m.count
+      case m => fail(s"Unexpected broker metric of class ${m.getClass}")
+    }
+  }
+
   private def verifyYammerMetricRecorded(name: String, verify: Double => 
Boolean = d => d > 0): Double = {
     val metricValue = yammerMetricValue(name).asInstanceOf[Double]
     assertTrue(s"Broker metric not recorded correctly for $name value 
$metricValue", verify(metricValue))
diff --git a/core/src/test/scala/unit/kafka/admin/AdminTest.scala 
b/core/src/test/scala/unit/kafka/admin/AdminTest.scala
index 7a88237..fb594d0 100755
--- a/core/src/test/scala/unit/kafka/admin/AdminTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/AdminTest.scala
@@ -445,35 +445,35 @@ class AdminTest extends ZooKeeperTestHarness with Logging 
with RackAwareTest {
     // create a topic with a few config overrides and check that they are 
applied
     val maxMessageSize = 1024
     val retentionMs = 1000 * 1000
-    AdminUtils.createTopic(server.zkUtils, topic, partitions, 1, 
makeConfig(maxMessageSize, retentionMs, "0:0,1:0,2:0", "0:1,1:1,2:1"))
+    adminZkClient.createTopic(topic, partitions, 1, makeConfig(maxMessageSize, 
retentionMs, "0:0,1:0,2:0", "0:1,1:1,2:1"))
 
     //Standard topic configs will be propagated at topic creation time, but 
the quota manager will not have been updated.
     checkConfig(maxMessageSize, retentionMs, "0:0,1:0,2:0", "0:1,1:1,2:1", 
false)
 
     //Update dynamically and all properties should be applied
-    AdminUtils.changeTopicConfig(server.zkUtils, topic, 
makeConfig(maxMessageSize, retentionMs, "0:0,1:0,2:0", "0:1,1:1,2:1"))
+    adminZkClient.changeTopicConfig(topic, makeConfig(maxMessageSize, 
retentionMs, "0:0,1:0,2:0", "0:1,1:1,2:1"))
 
     checkConfig(maxMessageSize, retentionMs, "0:0,1:0,2:0", "0:1,1:1,2:1", 
true)
 
     // now double the config values for the topic and check that it is applied
     val newConfig = makeConfig(2 * maxMessageSize, 2 * retentionMs, "*", "*")
-    AdminUtils.changeTopicConfig(server.zkUtils, topic, makeConfig(2 * 
maxMessageSize, 2 * retentionMs, "*", "*"))
+    adminZkClient.changeTopicConfig(topic, makeConfig(2 * maxMessageSize, 2 * 
retentionMs, "*", "*"))
     checkConfig(2 * maxMessageSize, 2 * retentionMs, "*", "*", 
quotaManagerIsThrottled = true)
 
     // Verify that the same config can be read from ZK
-    val configInZk = AdminUtils.fetchEntityConfig(server.zkUtils, 
ConfigType.Topic, topic)
+    val configInZk = adminZkClient.fetchEntityConfig(ConfigType.Topic, topic)
     assertEquals(newConfig, configInZk)
 
     //Now delete the config
-    AdminUtils.changeTopicConfig(server.zkUtils, topic, new Properties)
+    adminZkClient.changeTopicConfig(topic, new Properties)
     checkConfig(Defaults.MaxMessageSize, Defaults.RetentionMs, "", "", 
quotaManagerIsThrottled = false)
 
     //Add config back
-    AdminUtils.changeTopicConfig(server.zkUtils, topic, 
makeConfig(maxMessageSize, retentionMs, "0:0,1:0,2:0", "0:1,1:1,2:1"))
+    adminZkClient.changeTopicConfig(topic, makeConfig(maxMessageSize, 
retentionMs, "0:0,1:0,2:0", "0:1,1:1,2:1"))
     checkConfig(maxMessageSize, retentionMs, "0:0,1:0,2:0", "0:1,1:1,2:1", 
quotaManagerIsThrottled = true)
 
     //Now ensure updating to "" removes the throttled replica list also
-    AdminUtils.changeTopicConfig(server.zkUtils, topic, 
propsWith((LogConfig.FollowerReplicationThrottledReplicasProp, ""), 
(LogConfig.LeaderReplicationThrottledReplicasProp, "")))
+    adminZkClient.changeTopicConfig(topic, 
propsWith((LogConfig.FollowerReplicationThrottledReplicasProp, ""), 
(LogConfig.LeaderReplicationThrottledReplicasProp, "")))
     checkConfig(Defaults.MaxMessageSize, Defaults.RetentionMs, "", "",  
quotaManagerIsThrottled = false)
   }
 
@@ -494,27 +494,27 @@ class AdminTest extends ZooKeeperTestHarness with Logging 
with RackAwareTest {
     val limit: Long = 1000000
 
     // Set the limit & check it is applied to the log
-    changeBrokerConfig(zkUtils, brokerIds, propsWith(
+    adminZkClient.changeBrokerConfig(brokerIds, propsWith(
       (LeaderReplicationThrottledRateProp, limit.toString),
       (FollowerReplicationThrottledRateProp, limit.toString)))
     checkConfig(limit)
 
     // Now double the config values for the topic and check that it is applied
     val newLimit = 2 * limit
-    changeBrokerConfig(zkUtils, brokerIds,  propsWith(
+    adminZkClient.changeBrokerConfig(brokerIds,  propsWith(
       (LeaderReplicationThrottledRateProp, newLimit.toString),
       (FollowerReplicationThrottledRateProp, newLimit.toString)))
     checkConfig(newLimit)
 
     // Verify that the same config can be read from ZK
     for (brokerId <- brokerIds) {
-      val configInZk = AdminUtils.fetchEntityConfig(servers(brokerId).zkUtils, 
ConfigType.Broker, brokerId.toString)
+      val configInZk = adminZkClient.fetchEntityConfig(ConfigType.Broker, 
brokerId.toString)
       assertEquals(newLimit, 
configInZk.getProperty(LeaderReplicationThrottledRateProp).toInt)
       assertEquals(newLimit, 
configInZk.getProperty(FollowerReplicationThrottledRateProp).toInt)
     }
 
     //Now delete the config
-    changeBrokerConfig(servers(0).zkUtils, brokerIds, new Properties)
+    adminZkClient.changeBrokerConfig(brokerIds, new Properties)
     checkConfig(DefaultReplicationThrottledRate)
   }
 
diff --git a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala 
b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
index bcddd40..135f7f1 100755
--- a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
@@ -23,6 +23,7 @@ import kafka.utils.TestUtils._
 import kafka.api.FetchRequestBuilder
 import kafka.message.ByteBufferMessageSet
 import java.io.File
+import java.net.UnknownHostException
 
 import kafka.log.LogManager
 import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
@@ -31,6 +32,7 @@ import 
org.apache.kafka.common.serialization.{IntegerSerializer, StringSerialize
 import org.I0Itec.zkclient.exception.ZkException
 import org.junit.{Before, Test}
 import org.junit.Assert._
+
 import scala.reflect.ClassTag
 
 class ServerShutdownTest extends ZooKeeperTestHarness {
@@ -130,7 +132,7 @@ class ServerShutdownTest extends ZooKeeperTestHarness {
     val newProps = TestUtils.createBrokerConfig(0, zkConnect)
     newProps.setProperty("zookeeper.connect", "fakehostthatwontresolve:65535")
     val newConfig = KafkaConfig.fromProps(newProps)
-    verifyCleanShutdownAfterFailedStartup[ZkException](newConfig)
+    verifyCleanShutdownAfterFailedStartup[UnknownHostException](newConfig)
   }
 
   @Test
diff --git a/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala 
b/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala
index eedf5f3..ff9178a 100755
--- a/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala
@@ -17,8 +17,6 @@
 
 package kafka.server
 
-import java.net.BindException
-
 import kafka.common.KafkaException
 import kafka.utils.{TestUtils, ZkUtils}
 import kafka.zk.ZooKeeperTestHarness
diff --git 
a/core/src/test/scala/unit/kafka/server/SessionExpireListenerTest.scala 
b/core/src/test/scala/unit/kafka/server/SessionExpireListenerTest.scala
deleted file mode 100644
index fda17c0..0000000
--- a/core/src/test/scala/unit/kafka/server/SessionExpireListenerTest.scala
+++ /dev/null
@@ -1,73 +0,0 @@
-/**
-  * Licensed to the Apache Software Foundation (ASF) under one or more
-  * contributor license agreements.  See the NOTICE file distributed with
-  * this work for additional information regarding copyright ownership.
-  * The ASF licenses this file to You under the Apache License, Version 2.0
-  * (the "License"); you may not use this file except in compliance with
-  * the License.  You may obtain a copy of the License at
-  *
-  *    http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing, software
-  * distributed under the License is distributed on an "AS IS" BASIS,
-  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  * See the License for the specific language governing permissions and
-  * limitations under the License.
-  */
-
-package kafka.server
-
-import kafka.api.ApiVersion
-import kafka.utils.ZkUtils
-import org.I0Itec.zkclient.ZkClient
-import org.apache.zookeeper.Watcher
-import org.easymock.EasyMock
-import org.junit.{Assert, Test}
-import Assert._
-import com.yammer.metrics.Metrics
-import com.yammer.metrics.core.Meter
-import scala.collection.JavaConverters._
-
-class SessionExpireListenerTest {
-
-  private val brokerId = 1
-
-  private def cleanMetricsRegistry() {
-    val metrics = Metrics.defaultRegistry
-    metrics.allMetrics.keySet.asScala.foreach(metrics.removeMetric)
-  }
-
-  @Test
-  def testSessionExpireListenerMetrics() {
-
-    cleanMetricsRegistry()
-
-    val metrics = Metrics.defaultRegistry
-
-    def checkMeterCount(name: String, expected: Long) {
-      val meter = metrics.allMetrics.asScala.collectFirst {
-        case (metricName, meter: Meter) if metricName.getName == name => meter
-      }.getOrElse(sys.error(s"Unable to find meter with name $name"))
-      assertEquals(s"Unexpected meter count for $name", expected, meter.count)
-    }
-
-    val zkClient = EasyMock.mock(classOf[ZkClient])
-    val zkUtils = ZkUtils(zkClient, isZkSecurityEnabled = false)
-    import Watcher._
-    val healthcheck = new KafkaHealthcheck(brokerId, Seq.empty, zkUtils, None, 
ApiVersion.latestVersion)
-
-    val expiresPerSecName = "ZooKeeperExpiresPerSec"
-    val disconnectsPerSecName = "ZooKeeperDisconnectsPerSec"
-    checkMeterCount(expiresPerSecName, 0)
-    checkMeterCount(disconnectsPerSecName, 0)
-
-    
healthcheck.sessionExpireListener.handleStateChanged(Event.KeeperState.Expired)
-    checkMeterCount(expiresPerSecName, 1)
-    checkMeterCount(disconnectsPerSecName, 0)
-
-    
healthcheck.sessionExpireListener.handleStateChanged(Event.KeeperState.Disconnected)
-    checkMeterCount(expiresPerSecName, 1)
-    checkMeterCount(disconnectsPerSecName, 1)
-  }
-
-}
diff --git a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala 
b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
index ecaa943..f0d6cf0 100644
--- a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
+++ b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
@@ -19,13 +19,17 @@ package kafka.zk
 import java.util.{Properties, UUID}
 import java.nio.charset.StandardCharsets.UTF_8
 
+import kafka.api.ApiVersion
+import kafka.cluster.EndPoint
 import kafka.log.LogConfig
 import kafka.security.auth._
 import kafka.server.ConfigType
+import kafka.utils.CoreUtils
 import org.apache.kafka.common.TopicPartition
-import org.apache.kafka.common.security.auth.KafkaPrincipal
+import org.apache.kafka.common.network.ListenerName
+import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
 import org.apache.zookeeper.KeeperException.NodeExistsException
-import org.junit.Assert.{assertEquals, assertFalse, assertTrue}
+import org.junit.Assert._
 import org.junit.Test
 
 class KafkaZkClientTest extends ZooKeeperTestHarness {
@@ -412,6 +416,44 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
   }
 
   @Test
+  def testBrokerRegistrationMethods() {
+    zkClient.createTopLevelPaths()
+
+    val brokerInfo = new BrokerInfo(1, "test.host", 9999,
+      Seq(new EndPoint("test.host", 9999, 
ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT), 
SecurityProtocol.PLAINTEXT)),
+      9998, None, ApiVersion.latestVersion)
+
+    zkClient.registerBrokerInZk(brokerInfo)
+    val broker = zkClient.getBroker(1).getOrElse(fail("Unregistered broker"))
+
+    assertEquals(brokerInfo.id, broker.id)
+    assertEquals(brokerInfo.endpoints(), broker.endPoints.mkString(","))
+  }
+
+  @Test
+  def testClusterIdMethods() {
+    val clusterId = CoreUtils.generateUuidAsBase64
+
+    zkClient.createOrGetClusterId(clusterId)
+    assertEquals(clusterId, zkClient.getClusterId.getOrElse(fail("No cluster 
id found")))
+   }
+
+  @Test
+  def testBrokerSequenceIdMethods() {
+    val sequenceId = zkClient.generateBrokerSequenceId()
+    assertEquals(sequenceId + 1, zkClient.generateBrokerSequenceId)
+  }
+
+  @Test
+  def testCreateTopLevelPaths() {
+    zkClient.createTopLevelPaths()
+
+    ZkData.PersistentZkPaths.foreach {
+      path => assertTrue(zkClient.pathExists(path))
+    }
+  }
+
+  @Test
   def testPreferredReplicaElectionMethods() {
 
     assertTrue(zkClient.getPreferredReplicaElection.isEmpty)
diff --git a/core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala 
b/core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala
index 75842f0..141dcee 100644
--- a/core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala
+++ b/core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala
@@ -23,18 +23,29 @@ import java.util.concurrent.atomic.AtomicBoolean
 import java.util.concurrent.{ArrayBlockingQueue, CountDownLatch, TimeUnit}
 import javax.security.auth.login.Configuration
 
+import com.yammer.metrics.Metrics
+import com.yammer.metrics.core.Meter
 import kafka.zk.ZooKeeperTestHarness
 import org.apache.kafka.common.security.JaasUtils
-import org.apache.zookeeper.KeeperException.{Code, NoNodeException}
 import org.apache.kafka.common.utils.Time
-import org.apache.zookeeper.{CreateMode, ZooDefs}
+import org.apache.zookeeper.KeeperException.{Code, NoNodeException}
+import org.apache.zookeeper.Watcher.Event.{EventType, KeeperState}
+import org.apache.zookeeper.{CreateMode, WatchedEvent, ZooDefs}
 import org.junit.Assert.{assertArrayEquals, assertEquals, assertTrue}
-import org.junit.{After, Test}
+import org.junit.{After, Before, Test}
+
+import scala.collection.JavaConverters._
 
 class ZooKeeperClientTest extends ZooKeeperTestHarness {
   private val mockPath = "/foo"
   private val time = Time.SYSTEM
 
+  @Before
+  override def setUp() {
+    cleanMetricsRegistry()
+    super.setUp()
+  }
+
   @After
   override def tearDown() {
     super.tearDown()
@@ -360,5 +371,35 @@ class ZooKeeperClientTest extends ZooKeeperTestHarness {
     }
   }
 
+  @Test
+  def testSessionExpireListenerMetrics() {
+    val metrics = Metrics.defaultRegistry
+
+    def checkMeterCount(name: String, expected: Long) {
+      val meter = metrics.allMetrics.asScala.collectFirst {
+        case (metricName, meter: Meter) if metricName.getName == name => meter
+      }.getOrElse(sys.error(s"Unable to find meter with name $name"))
+      assertEquals(s"Unexpected meter count for $name", expected, meter.count)
+    }
+
+    val expiresPerSecName = "ZooKeeperExpiresPerSec"
+    val disconnectsPerSecName = "ZooKeeperDisconnectsPerSec"
+    checkMeterCount(expiresPerSecName, 0)
+    checkMeterCount(disconnectsPerSecName, 0)
+
+    zooKeeperClient.ZooKeeperClientWatcher.process(new 
WatchedEvent(EventType.None, KeeperState.Expired, null))
+    checkMeterCount(expiresPerSecName, 1)
+    checkMeterCount(disconnectsPerSecName, 0)
+
+    zooKeeperClient.ZooKeeperClientWatcher.process(new 
WatchedEvent(EventType.None, KeeperState.Disconnected, null))
+    checkMeterCount(expiresPerSecName, 1)
+    checkMeterCount(disconnectsPerSecName, 1)
+  }
+
+  private def cleanMetricsRegistry() {
+    val metrics = Metrics.defaultRegistry
+    metrics.allMetrics.keySet.asScala.foreach(metrics.removeMetric)
+  }
+
   private def bytes = 
UUID.randomUUID().toString.getBytes(StandardCharsets.UTF_8)
 }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
index 71758a5..0551fac 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
@@ -97,7 +97,8 @@ public class InternalTopicIntegrationTest {
                 CLUSTER.zKConnectString(),
                 DEFAULT_ZK_SESSION_TIMEOUT_MS,
                 DEFAULT_ZK_CONNECTION_TIMEOUT_MS,
-                Integer.MAX_VALUE, Time.SYSTEM);
+                Integer.MAX_VALUE, Time.SYSTEM,
+                "testMetricGroup", "testMetricType");
         final KafkaZkClient kafkaZkClient = new KafkaZkClient(zkClient, false, 
Time.SYSTEM);
         try {
             final AdminZkClient adminZkClient = new 
AdminZkClient(kafkaZkClient);
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/utils/KafkaEmbedded.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/utils/KafkaEmbedded.java
index e277d82..275d580 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/utils/KafkaEmbedded.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/utils/KafkaEmbedded.java
@@ -177,7 +177,9 @@ public class KafkaEmbedded {
                 DEFAULT_ZK_SESSION_TIMEOUT_MS,
                 DEFAULT_ZK_CONNECTION_TIMEOUT_MS,
                 Integer.MAX_VALUE,
-                Time.SYSTEM);
+                Time.SYSTEM,
+                "testMetricGroup",
+                "testMetricType");
         final KafkaZkClient kafkaZkClient = new KafkaZkClient(zkClient, false, 
Time.SYSTEM);
         final AdminZkClient adminZkClient = new AdminZkClient(kafkaZkClient);
         adminZkClient.createTopic(topic, partitions, replication, topicConfig, 
RackAwareMode.Enforced$.MODULE$);
@@ -192,7 +194,9 @@ public class KafkaEmbedded {
                 DEFAULT_ZK_SESSION_TIMEOUT_MS,
                 DEFAULT_ZK_CONNECTION_TIMEOUT_MS,
                 Integer.MAX_VALUE,
-                Time.SYSTEM);
+                Time.SYSTEM,
+                "testMetricGroup",
+                "testMetricType");
         final KafkaZkClient kafkaZkClient = new KafkaZkClient(zkClient, false, 
Time.SYSTEM);
         final AdminZkClient adminZkClient = new AdminZkClient(kafkaZkClient);
         adminZkClient.deleteTopic(topic);

-- 
To stop receiving notification emails like this one, please contact
['"[email protected]" <[email protected]>'].

Reply via email to