This is an automated email from the ASF dual-hosted git repository.
ijuma pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 27c2c81 KAFKA-6320: Move ZK metrics in KafkaHealthCheck to
ZookeeperClient (#4351)
27c2c81 is described below
commit 27c2c81663d6518a3da5cfde88109f90ded8a6db
Author: Jun Rao <[email protected]>
AuthorDate: Fri Dec 22 02:28:57 2017 -0800
KAFKA-6320: Move ZK metrics in KafkaHealthCheck to ZookeeperClient (#4351)
* Moved metrics in KafkaHealthCheck to ZookeeperClient.
* Converted remaining ZkUtils usage in KafkaServer to ZookeeperClient and
removed ZkUtils from KafkaServer.
* Made the re-creation of ZooKeeper during ZK session expiration with
infinite retries.
* Added unit tests for all new methods in KafkaZkClient.
Reviewers: Manikumar Reddy <[email protected]>, Ismael Juma
<[email protected]>
---
.../scala/kafka/controller/KafkaController.scala | 8 +-
.../transaction/ProducerIdManager.scala | 8 +-
.../kafka/security/auth/SimpleAclAuthorizer.scala | 3 +-
.../main/scala/kafka/server/KafkaHealthcheck.scala | 113 ---------------------
core/src/main/scala/kafka/server/KafkaServer.scala | 67 +++++-------
core/src/main/scala/kafka/zk/KafkaZkClient.scala | 82 ++++++++++++++-
core/src/main/scala/kafka/zk/ZkData.scala | 41 ++++++++
.../scala/kafka/zookeeper/ZooKeeperClient.scala | 78 ++++++++++----
.../scala/integration/kafka/api/MetricsTest.scala | 26 +++--
.../test/scala/unit/kafka/admin/AdminTest.scala | 22 ++--
.../unit/kafka/server/ServerShutdownTest.scala | 4 +-
.../unit/kafka/server/ServerStartupTest.scala | 2 -
.../kafka/server/SessionExpireListenerTest.scala | 73 -------------
.../scala/unit/kafka/zk/KafkaZkClientTest.scala | 46 ++++++++-
.../unit/kafka/zookeeper/ZooKeeperClientTest.scala | 47 ++++++++-
.../integration/InternalTopicIntegrationTest.java | 3 +-
.../streams/integration/utils/KafkaEmbedded.java | 8 +-
17 files changed, 340 insertions(+), 291 deletions(-)
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala
b/core/src/main/scala/kafka/controller/KafkaController.scala
index f272851..ca8422e 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -1416,13 +1416,7 @@ class KafkaController(val config: KafkaConfig, zkClient:
KafkaZkClient, time: Ti
override def process(): Unit = {
zkClient.registerBrokerInZk(brokerInfo)
- val wasActiveBeforeChange = isActive
-
zkClient.registerZNodeChangeHandlerAndCheckExistence(controllerChangeHandler)
- activeControllerId = zkClient.getControllerId.getOrElse(-1)
- if (wasActiveBeforeChange && !isActive) {
- onControllerResignation()
- }
- elect()
+ Reelect.process()
}
}
diff --git
a/core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala
b/core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala
index 716e3d1..9c815bc 100644
--- a/core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala
+++ b/core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala
@@ -20,7 +20,7 @@ import java.nio.charset.StandardCharsets
import kafka.common.KafkaException
import kafka.utils.{Json, Logging, ZkUtils}
-import kafka.zk.KafkaZkClient
+import kafka.zk.{KafkaZkClient, ProducerIdBlockZNode}
import scala.collection.JavaConverters._
@@ -87,7 +87,7 @@ class ProducerIdManager(val brokerId: Int, val zkClient:
KafkaZkClient) extends
var zkWriteComplete = false
while (!zkWriteComplete) {
// refresh current producerId block from zookeeper again
- val (dataOpt, zkVersion) =
zkClient.getDataAndVersion(ZkUtils.ProducerIdBlockPath)
+ val (dataOpt, zkVersion) =
zkClient.getDataAndVersion(ProducerIdBlockZNode.path)
// generate the new producerId block
currentProducerIdBlock = dataOpt match {
@@ -110,7 +110,7 @@ class ProducerIdManager(val brokerId: Int, val zkClient:
KafkaZkClient) extends
val newProducerIdBlockData =
ProducerIdManager.generateProducerIdBlockJson(currentProducerIdBlock)
// try to write the new producerId block into zookeeper
- val (succeeded, version) =
zkClient.conditionalUpdatePath(ZkUtils.ProducerIdBlockPath,
+ val (succeeded, version) =
zkClient.conditionalUpdatePath(ProducerIdBlockZNode.path,
newProducerIdBlockData, zkVersion, Some(checkProducerIdBlockZkData))
zkWriteComplete = succeeded
@@ -122,7 +122,7 @@ class ProducerIdManager(val brokerId: Int, val zkClient:
KafkaZkClient) extends
private def checkProducerIdBlockZkData(zkClient: KafkaZkClient, path:
String, expectedData: Array[Byte]): (Boolean, Int) = {
try {
val expectedPidBlock =
ProducerIdManager.parseProducerIdBlockData(expectedData)
- zkClient.getDataAndVersion(ZkUtils.ProducerIdBlockPath) match {
+ zkClient.getDataAndVersion(ProducerIdBlockZNode.path) match {
case (Some(data), zkVersion) =>
val currProducerIdBLock =
ProducerIdManager.parseProducerIdBlockData(data)
(currProducerIdBLock == expectedPidBlock, zkVersion)
diff --git a/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
b/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
index 80d85a0..74bc809 100644
--- a/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
+++ b/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
@@ -92,7 +92,8 @@ class SimpleAclAuthorizer extends Authorizer with Logging {
val zkMaxInFlightRequests =
configs.get(SimpleAclAuthorizer.ZkMaxInFlightRequests).map(_.toString.toInt).getOrElse(kafkaConfig.zkMaxInFlightRequests)
val time = Time.SYSTEM
- val zooKeeperClient = new ZooKeeperClient(zkUrl, zkSessionTimeOutMs,
zkConnectionTimeoutMs, zkMaxInFlightRequests, time)
+ val zooKeeperClient = new ZooKeeperClient(zkUrl, zkSessionTimeOutMs,
zkConnectionTimeoutMs, zkMaxInFlightRequests,
+ time, "kafka.security", "SimpleAclAuthorizer")
zkClient = new KafkaZkClient(zooKeeperClient,
kafkaConfig.zkEnableSecureAcls, time)
zkClient.createAclPaths()
diff --git a/core/src/main/scala/kafka/server/KafkaHealthcheck.scala
b/core/src/main/scala/kafka/server/KafkaHealthcheck.scala
deleted file mode 100644
index 2ad8168..0000000
--- a/core/src/main/scala/kafka/server/KafkaHealthcheck.scala
+++ /dev/null
@@ -1,113 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.server
-
-import java.net.InetAddress
-import java.util.Locale
-import java.util.concurrent.TimeUnit
-
-import kafka.api.ApiVersion
-import kafka.cluster.EndPoint
-import kafka.metrics.KafkaMetricsGroup
-import kafka.utils._
-import com.yammer.metrics.core.Gauge
-import org.I0Itec.zkclient.IZkStateListener
-import org.apache.kafka.common.security.auth.SecurityProtocol
-import org.apache.zookeeper.Watcher.Event.KeeperState
-
-import scala.collection.mutable.Set
-
-/**
- * This class registers the broker in zookeeper to allow
- * other brokers and consumers to detect failures. It uses an ephemeral znode
with the path:
- * /brokers/ids/[0...N] --> advertisedHost:advertisedPort
- *
- * Right now our definition of health is fairly naive. If we register in zk
we are healthy, otherwise
- * we are dead.
- */
-class KafkaHealthcheck(brokerId: Int,
- advertisedEndpoints: Seq[EndPoint],
- zkUtils: ZkUtils,
- rack: Option[String],
- interBrokerProtocolVersion: ApiVersion) extends Logging
{
-
- private[server] val sessionExpireListener = new SessionExpireListener
-
- def startup() {
- zkUtils.subscribeStateChanges(sessionExpireListener)
- // registration is done in KafkaServer now
- }
-
- def shutdown(): Unit = sessionExpireListener.shutdown()
-
- /**
- * When we get a SessionExpired event, it means that we have lost all
ephemeral nodes and ZKClient has re-established
- * a connection for us. We need to re-register this broker in the broker
registry. We rely on `handleStateChanged`
- * to record ZooKeeper connection state metrics.
- */
- class SessionExpireListener extends IZkStateListener with KafkaMetricsGroup {
-
- private val metricNames = Set[String]()
-
- private[server] val stateToMeterMap = {
- import KeeperState._
- val stateToEventTypeMap = Map(
- Disconnected -> "Disconnects",
- SyncConnected -> "SyncConnects",
- AuthFailed -> "AuthFailures",
- ConnectedReadOnly -> "ReadOnlyConnects",
- SaslAuthenticated -> "SaslAuthentications",
- Expired -> "Expires"
- )
- stateToEventTypeMap.map { case (state, eventType) =>
- val name = s"ZooKeeper${eventType}PerSec"
- metricNames += name
- state -> newMeter(name, eventType.toLowerCase(Locale.ROOT),
TimeUnit.SECONDS)
- }
- }
-
- private[server] val sessionStateGauge =
- newGauge("SessionState", new Gauge[String] {
- override def value: String =
-
Option(zkUtils.zkConnection.getZookeeperState.toString).getOrElse("DISCONNECTED")
- })
-
- metricNames += "SessionState"
-
- @throws[Exception]
- override def handleStateChanged(state: KeeperState) {
- stateToMeterMap.get(state).foreach(_.mark())
- }
-
- @throws[Exception]
- override def handleNewSession() {
- //info("re-registering broker info in ZK for broker " + brokerId)
- //register()
- //info("done re-registering broker")
- //info("Subscribing to %s path to watch for new
topics".format(ZkUtils.BrokerTopicsPath))
- }
-
- override def handleSessionEstablishmentError(err: Throwable) {
- error("Could not establish session with zookeeper", err)
- }
-
- def shutdown(): Unit = metricNames.foreach(removeMetric(_))
-
- }
-
-}
\ No newline at end of file
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala
b/core/src/main/scala/kafka/server/KafkaServer.scala
index 5bd3f8e..8643233 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -26,7 +26,7 @@ import java.util.concurrent.atomic.{AtomicBoolean,
AtomicInteger}
import com.yammer.metrics.core.Gauge
import kafka.api.KAFKA_0_9_0
import kafka.cluster.{Broker, EndPoint}
-import kafka.common.{GenerateBrokerIdException, InconsistentBrokerIdException}
+import kafka.common.{GenerateBrokerIdException, InconsistentBrokerIdException,
KafkaException}
import kafka.controller.KafkaController
import kafka.coordinator.group.GroupCoordinator
import kafka.coordinator.transaction.TransactionCoordinator
@@ -133,11 +133,9 @@ class KafkaServer(val config: KafkaConfig, time: Time =
Time.SYSTEM, threadNameP
val kafkaScheduler = new KafkaScheduler(config.backgroundThreads)
- var kafkaHealthcheck: KafkaHealthcheck = null
var metadataCache: MetadataCache = null
var quotaManagers: QuotaFactory.QuotaManagers = null
- var zkUtils: ZkUtils = null
private var _zkClient: KafkaZkClient = null
val correlationId: AtomicInteger = new AtomicInteger(0)
val brokerMetaPropsFile = "meta.properties"
@@ -199,10 +197,10 @@ class KafkaServer(val config: KafkaConfig, time: Time =
Time.SYSTEM, threadNameP
kafkaScheduler.startup()
/* setup zookeeper */
- zkUtils = initZk()
+ initZkClient(time)
/* Get or create cluster_id */
- _clusterId = getOrGenerateClusterId(zkUtils)
+ _clusterId = getOrGenerateClusterId(zkClient)
info(s"Cluster ID = $clusterId")
/* generate brokerId */
@@ -226,10 +224,6 @@ class KafkaServer(val config: KafkaConfig, time: Time =
Time.SYSTEM, threadNameP
logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size)
- val zooKeeperClient = new ZooKeeperClient(config.zkConnect,
config.zkSessionTimeoutMs,
- config.zkConnectionTimeoutMs, config.zkMaxInFlightRequests, time)
- _zkClient = new KafkaZkClient(zooKeeperClient, zkUtils.isSecure, time)
-
/* start log manager */
logManager = LogManager(config, initialOfflineDirs, zkClient,
brokerState, kafkaScheduler, time, brokerTopicStats, logDirFailureChannel)
logManager.startup()
@@ -252,12 +246,6 @@ class KafkaServer(val config: KafkaConfig, time: Time =
Time.SYSTEM, threadNameP
endpoint
}
- // to be cleaned up in KAFKA-6320
- kafkaHealthcheck = new KafkaHealthcheck(config.brokerId, listeners,
zkUtils, config.rack,
- config.interBrokerProtocolVersion)
- kafkaHealthcheck.startup()
- // KAFKA-6320
-
val updatedEndpoints = listeners.map(endpoint =>
if (endpoint.host == null || endpoint.host.trim.isEmpty)
endpoint.copy(host = InetAddress.getLocalHost.getCanonicalHostName)
@@ -277,7 +265,7 @@ class KafkaServer(val config: KafkaConfig, time: Time =
Time.SYSTEM, threadNameP
updatedEndpoints, jmxPort, config.rack,
config.interBrokerProtocolVersion)
zkClient.registerBrokerInZk(brokerInfo)
- // Now that the broker id is successfully registered via
KafkaHealthcheck, checkpoint it
+ // Now that the broker id is successfully registered, checkpoint it
checkpointBrokerId(config.brokerId)
/* start kafka controller */
@@ -351,7 +339,7 @@ class KafkaServer(val config: KafkaConfig, time: Time =
Time.SYSTEM, threadNameP
new ReplicaManager(config, metrics, time, zkClient, kafkaScheduler,
logManager, isShuttingDown, quotaManagers,
brokerTopicStats, metadataCache, logDirFailureChannel)
- private def initZk(): ZkUtils = {
+ private def initZkClient(time: Time): Unit = {
info(s"Connecting to zookeeper on ${config.zkConnect}")
val chrootIndex = config.zkConnect.indexOf("/")
@@ -366,29 +354,25 @@ class KafkaServer(val config: KafkaConfig, time: Time =
Time.SYSTEM, threadNameP
if (secureAclsEnabled && !isZkSecurityEnabled)
throw new
java.lang.SecurityException(s"${KafkaConfig.ZkEnableSecureAclsProp} is true,
but the verification of the JAAS login file failed.")
+ // make sure chroot path exists
chrootOption.foreach { chroot =>
val zkConnForChrootCreation = config.zkConnect.substring(0, chrootIndex)
- val zkClientForChrootCreation =
ZkUtils.withMetrics(zkConnForChrootCreation,
- sessionTimeout =
config.zkSessionTimeoutMs,
- connectionTimeout =
config.zkConnectionTimeoutMs,
- secureAclsEnabled,
- time)
- zkClientForChrootCreation.makeSurePersistentPathExists(chroot)
+ val zooKeeperClient = new ZooKeeperClient(zkConnForChrootCreation,
config.zkSessionTimeoutMs,
+ config.zkConnectionTimeoutMs, config.zkMaxInFlightRequests, time)
+ val zkClient = new KafkaZkClient(zooKeeperClient, secureAclsEnabled,
time)
+ zkClient.makeSurePersistentPathExists(chroot)
info(s"Created zookeeper path $chroot")
- zkClientForChrootCreation.close()
+ zkClient.close()
}
- val zkUtils = ZkUtils.withMetrics(config.zkConnect,
- sessionTimeout = config.zkSessionTimeoutMs,
- connectionTimeout = config.zkConnectionTimeoutMs,
- secureAclsEnabled,
- time)
- zkUtils.setupCommonPaths()
- zkUtils
+ val zooKeeperClient = new ZooKeeperClient(config.zkConnect,
config.zkSessionTimeoutMs,
+ config.zkConnectionTimeoutMs, config.zkMaxInFlightRequests, time)
+ _zkClient = new KafkaZkClient(zooKeeperClient, secureAclsEnabled, time)
+ _zkClient.createTopLevelPaths()
}
- def getOrGenerateClusterId(zkUtils: ZkUtils): String = {
-
zkUtils.getClusterId.getOrElse(zkUtils.createOrGetClusterId(CoreUtils.generateUuidAsBase64))
+ def getOrGenerateClusterId(zkClient: KafkaZkClient): String = {
+
zkClient.getClusterId.getOrElse(zkClient.createOrGetClusterId(CoreUtils.generateUuidAsBase64))
}
/**
@@ -455,9 +439,9 @@ class KafkaServer(val config: KafkaConfig, time: Time =
Time.SYSTEM, threadNameP
// Get the current controller info. This is to ensure we use the
most recent info to issue the
// controlled shutdown request
- val controllerId = zkUtils.getController()
+ val controllerId = zkClient.getControllerId.getOrElse(throw new
KafkaException("Controller doesn't exist"))
//If this method returns None ignore and try again
- zkUtils.getBrokerInfo(controllerId).foreach { broker =>
+ zkClient.getBroker(controllerId).foreach { broker =>
// if this is the first attempt, if the controller has changed or
if an exception was thrown in a previous
// attempt, connect to the most recent controller
if (ioException || broker != prevController) {
@@ -549,9 +533,6 @@ class KafkaServer(val config: KafkaConfig, time: Time =
Time.SYSTEM, threadNameP
CoreUtils.swallow(controlledShutdown(), this)
brokerState.newState(BrokerShuttingDown)
- if (kafkaHealthcheck != null)
- CoreUtils.swallow(kafkaHealthcheck.shutdown(), this)
-
if (dynamicConfigManager != null)
CoreUtils.swallow(dynamicConfigManager.shutdown(), this)
@@ -582,8 +563,7 @@ class KafkaServer(val config: KafkaConfig, time: Time =
Time.SYSTEM, threadNameP
if (kafkaController != null)
CoreUtils.swallow(kafkaController.shutdown(), this)
- if (zkUtils != null)
- CoreUtils.swallow(zkUtils.close(), this)
+
if (zkClient != null)
CoreUtils.swallow(zkClient.close(), this)
@@ -689,9 +669,14 @@ class KafkaServer(val config: KafkaConfig, time: Time =
Time.SYSTEM, threadNameP
}
}
+ /**
+ * Return a sequence id generated by updating the broker sequence id path
in ZK.
+ * Users can provide brokerId in the config. To avoid conflicts between ZK
generated
+ * sequence id and configured brokerId, we increment the generated sequence
id by KafkaConfig.MaxReservedBrokerId.
+ */
private def generateBrokerId: Int = {
try {
- zkUtils.getBrokerSequenceId(config.maxReservedBrokerId)
+ zkClient.generateBrokerSequenceId() + config.maxReservedBrokerId
} catch {
case e: Exception =>
error("Failed to generate broker.id due to ", e)
diff --git a/core/src/main/scala/kafka/zk/KafkaZkClient.scala
b/core/src/main/scala/kafka/zk/KafkaZkClient.scala
index 13fd024..8c3f018 100644
--- a/core/src/main/scala/kafka/zk/KafkaZkClient.scala
+++ b/core/src/main/scala/kafka/zk/KafkaZkClient.scala
@@ -18,10 +18,10 @@ package kafka.zk
import java.util.Properties
-
import com.yammer.metrics.core.MetricName
import kafka.api.LeaderAndIsr
import kafka.cluster.Broker
+import kafka.common.KafkaException
import kafka.controller.LeaderIsrAndControllerEpoch
import kafka.log.LogConfig
import kafka.metrics.KafkaMetricsGroup
@@ -32,7 +32,7 @@ import kafka.utils._
import kafka.zookeeper._
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.utils.Time
-import org.apache.zookeeper.KeeperException.Code
+import org.apache.zookeeper.KeeperException.{Code, NodeExistsException}
import org.apache.zookeeper.data.{ACL, Stat}
import org.apache.zookeeper.{CreateMode, KeeperException}
@@ -299,6 +299,20 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient,
isSecure: Boolean, time: T
}
}
+ /**
+ * Get a broker from ZK
+ * @return an optional Broker
+ */
+ def getBroker(brokerId: Int): Option[Broker] = {
+ val getDataRequest = GetDataRequest(BrokerIdZNode.path(brokerId))
+ val getDataResponse = retryRequestUntilConnected(getDataRequest)
+ getDataResponse.resultCode match {
+ case Code.OK =>
+ Option(BrokerIdZNode.decode(brokerId, getDataResponse.data))
+ case Code.NONODE => None
+ case _ => throw getDataResponse.resultException.get
+ }
+ }
/**
* Gets the list of sorted broker Ids
@@ -1174,6 +1188,67 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient,
isSecure: Boolean, time: T
}
}
+ /**
+ * Get the cluster id.
+ * @return optional cluster id in String.
+ */
+ def getClusterId: Option[String] = {
+ val getDataRequest = GetDataRequest(ClusterIdZNode.path)
+ val getDataResponse = retryRequestUntilConnected(getDataRequest)
+ getDataResponse.resultCode match {
+ case Code.OK => Some(ClusterIdZNode.fromJson(getDataResponse.data))
+ case Code.NONODE => None
+ case _ => throw getDataResponse.resultException.get
+ }
+ }
+
+ /**
+ * Create the cluster Id. If the cluster id already exists, return the
current cluster id.
+ * @return cluster id
+ */
+ def createOrGetClusterId(proposedClusterId: String): String = {
+ try {
+ createRecursive(ClusterIdZNode.path,
ClusterIdZNode.toJson(proposedClusterId))
+ proposedClusterId
+ } catch {
+ case e: NodeExistsException => getClusterId.getOrElse(
+ throw new KafkaException("Failed to get cluster id from Zookeeper.
This can happen if /cluster/id is deleted from Zookeeper."))
+ }
+ }
+
+ /**
+ * Generate a borker id by updating the broker sequence id path in ZK and
return the version of the path.
+ * The version is incremented by one on every update starting from 1.
+ * @return sequence number as the broker id
+ */
+ def generateBrokerSequenceId(): Int = {
+ val setDataRequest = SetDataRequest(BrokerSequenceIdZNode.path,
Array.empty[Byte], -1)
+ val setDataResponse = retryRequestUntilConnected(setDataRequest)
+ setDataResponse.resultCode match {
+ case Code.OK => setDataResponse.stat.getVersion
+ case Code.NONODE =>
+ // maker sure the path exists
+ createRecursive(BrokerSequenceIdZNode.path, Array.empty[Byte],
throwIfPathExists = false)
+ generateBrokerSequenceId()
+ case _ => throw setDataResponse.resultException.get
+ }
+ }
+
+ /**
+ * Pre-create top level paths in ZK if needed.
+ */
+ def createTopLevelPaths(): Unit = {
+ ZkData.PersistentZkPaths.foreach(makeSurePersistentPathExists(_))
+ }
+
+ /**
+ * Make sure a persistent path exists in ZK.
+ * @param path
+ */
+ def makeSurePersistentPathExists(path: String): Unit = {
+ createRecursive(path, data = null, throwIfPathExists = false)
+ }
+
private def setConsumerOffset(group: String, topicPartition: TopicPartition,
offset: Long): SetDataResponse = {
val setDataRequest = SetDataRequest(ConsumerOffset.path(group,
topicPartition.topic, topicPartition.partition),
ConsumerOffset.encode(offset), ZkVersion.NoVersion)
@@ -1246,7 +1321,8 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient,
isSecure: Boolean, time: T
} else if (createResponse.resultCode == Code.NONODE) {
createRecursive0(parentPath(path))
createResponse = retryRequestUntilConnected(createRequest)
- createResponse.maybeThrow
+ if (throwIfPathExists || createResponse.resultCode != Code.NODEEXISTS)
+ createResponse.maybeThrow
} else if (createResponse.resultCode != Code.NODEEXISTS)
createResponse.maybeThrow
diff --git a/core/src/main/scala/kafka/zk/ZkData.scala
b/core/src/main/scala/kafka/zk/ZkData.scala
index a03263c..9578129 100644
--- a/core/src/main/scala/kafka/zk/ZkData.scala
+++ b/core/src/main/scala/kafka/zk/ZkData.scala
@@ -21,12 +21,15 @@ import java.util.Properties
import kafka.api.{ApiVersion, KAFKA_0_10_0_IV1, LeaderAndIsr}
import kafka.cluster.{Broker, EndPoint}
+import kafka.common.KafkaException
import kafka.controller.{IsrChangeNotificationHandler,
LeaderIsrAndControllerEpoch}
import kafka.security.auth.{Acl, Resource}
import kafka.security.auth.SimpleAclAuthorizer.VersionedAcls
+import kafka.server.ConfigType
import kafka.utils.Json
import org.apache.kafka.common.TopicPartition
import org.apache.zookeeper.data.Stat
+
import scala.collection.JavaConverters._
// This file contains objects for encoding/decoding data stored in ZooKeeper
nodes (znodes).
@@ -338,3 +341,41 @@ object AclChangeNotificationSequenceZNode {
def encode(resourceName : String): Array[Byte] = resourceName.getBytes(UTF_8)
def decode(bytes: Array[Byte]): String = new String(bytes, UTF_8)
}
+
+object ClusterIdZNode {
+ def path = "/cluster/id"
+
+ def toJson(id: String): Array[Byte] = {
+ Json.encodeAsBytes(Map("version" -> "1", "id" -> id).asJava)
+ }
+
+ def fromJson(clusterIdJson: Array[Byte]): String = {
+
Json.parseBytes(clusterIdJson).map(_.asJsonObject("id").to[String]).getOrElse {
+ throw new KafkaException(s"Failed to parse the cluster id json
$clusterIdJson")
+ }
+ }
+}
+
+object BrokerSequenceIdZNode {
+ def path = s"${BrokersZNode.path}/seqid"
+}
+
+object ProducerIdBlockZNode {
+ def path = "/latest_producer_id_block"
+}
+
+object ZkData {
+ // These are persistent ZK paths that should exist on kafka broker startup.
+ val PersistentZkPaths = Seq(
+ "/consumers", // old consumer path
+ BrokerIdsZNode.path,
+ TopicsZNode.path,
+ ConfigEntityChangeNotificationZNode.path,
+ ConfigEntityTypeZNode.path(ConfigType.Topic),
+ ConfigEntityTypeZNode.path(ConfigType.Client),
+ DeleteTopicsZNode.path,
+ BrokerSequenceIdZNode.path,
+ IsrChangeNotificationZNode.path,
+ ProducerIdBlockZNode.path,
+ LogDirEventNotificationZNode.path)
+}
\ No newline at end of file
diff --git a/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
b/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
index a0898da..6d786dc 100644
--- a/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
+++ b/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
@@ -17,9 +17,12 @@
package kafka.zookeeper
+import java.util.Locale
import java.util.concurrent.locks.{ReentrantLock, ReentrantReadWriteLock}
import java.util.concurrent.{ArrayBlockingQueue, ConcurrentHashMap,
CountDownLatch, Semaphore, TimeUnit}
+import com.yammer.metrics.core.{Gauge, MetricName}
+import kafka.metrics.KafkaMetricsGroup
import kafka.utils.CoreUtils.{inLock, inReadLock, inWriteLock}
import kafka.utils.Logging
import org.apache.kafka.common.utils.Time
@@ -31,6 +34,7 @@ import org.apache.zookeeper.data.{ACL, Stat}
import org.apache.zookeeper.{CreateMode, KeeperException, WatchedEvent,
Watcher, ZooKeeper}
import scala.collection.JavaConverters._
+import scala.collection.mutable.Set
/**
* A ZooKeeper client that encourages pipelined requests.
@@ -44,7 +48,9 @@ class ZooKeeperClient(connectString: String,
sessionTimeoutMs: Int,
connectionTimeoutMs: Int,
maxInFlightRequests: Int,
- time: Time) extends Logging {
+ time: Time,
+ metricGroup: String = "kafka.server",
+ metricType: String = "KafkaHealthcheck") extends Logging
with KafkaMetricsGroup {
this.logIdent = "[ZooKeeperClient] "
private val initializationLock = new ReentrantReadWriteLock()
private val isConnectedOrExpiredLock = new ReentrantLock()
@@ -54,10 +60,47 @@ class ZooKeeperClient(connectString: String,
private val inFlightRequests = new Semaphore(maxInFlightRequests)
private val stateChangeHandlers = new ConcurrentHashMap[String,
StateChangeHandler]().asScala
+ private val metricNames = Set[String]()
+
+ // The state map has to be created before creating ZooKeeper since it's
needed in the ZooKeeper callback.
+ private val stateToMeterMap = {
+ import KeeperState._
+ val stateToEventTypeMap = Map(
+ Disconnected -> "Disconnects",
+ SyncConnected -> "SyncConnects",
+ AuthFailed -> "AuthFailures",
+ ConnectedReadOnly -> "ReadOnlyConnects",
+ SaslAuthenticated -> "SaslAuthentications",
+ Expired -> "Expires"
+ )
+ stateToEventTypeMap.map { case (state, eventType) =>
+ val name = s"ZooKeeper${eventType}PerSec"
+ metricNames += name
+ state -> newMeter(name, eventType.toLowerCase(Locale.ROOT),
TimeUnit.SECONDS)
+ }
+ }
+
info(s"Initializing a new session to $connectString.")
@volatile private var zooKeeper = new ZooKeeper(connectString,
sessionTimeoutMs, ZooKeeperClientWatcher)
+
+ private val sessionStateGauge =
+ newGauge("SessionState", new Gauge[String] {
+ override def value: String =
+ Option(zooKeeper.getState.toString).getOrElse("DISCONNECTED")
+ })
+
+ metricNames += "SessionState"
+
waitUntilConnected(connectionTimeoutMs, TimeUnit.MILLISECONDS)
+
+ /**
+ * This is added to preserve the original metric name in JMX
+ */
+ override def metricName(name: String, metricTags:
scala.collection.Map[String, String]): MetricName = {
+ explicitMetricName(metricGroup, metricType, name, metricTags)
+ }
+
/**
* Send a request and wait for its response. See handle(Seq[AsyncRequest])
for details.
*
@@ -259,6 +302,7 @@ class ZooKeeperClient(connectString: String,
zNodeChildChangeHandlers.clear()
stateChangeHandlers.clear()
zooKeeper.close()
+ metricNames.foreach(removeMetric(_))
info("Closed.")
}
@@ -269,25 +313,20 @@ class ZooKeeperClient(connectString: String,
private def initialize(): Unit = {
if (!zooKeeper.getState.isAlive) {
info(s"Initializing a new session to $connectString.")
- var now = System.currentTimeMillis()
- val threshold = now + connectionTimeoutMs
- while (now < threshold) {
+ // retry forever until ZooKeeper can be instantiated
+ while (true) {
try {
zooKeeper.close()
zooKeeper = new ZooKeeper(connectString, sessionTimeoutMs,
ZooKeeperClientWatcher)
- waitUntilConnected(threshold - now, TimeUnit.MILLISECONDS)
return
} catch {
- case _: Exception =>
- now = System.currentTimeMillis()
- if (now < threshold) {
- Thread.sleep(1000)
- now = System.currentTimeMillis()
- }
+ case e: Exception =>
+ info("Error when recreating ZooKeeper", e)
+ Thread.sleep(1000)
}
}
info(s"Timed out waiting for connection during session initialization
while in state: ${zooKeeper.getState}")
- stateChangeHandlers.foreach {case (name, handler) =>
handler.onReconnectionTimeout()}
+ stateChangeHandlers.values.foreach(_.onReconnectionTimeout())
}
}
@@ -299,23 +338,26 @@ class ZooKeeperClient(connectString: String,
initialize()
}
- private object ZooKeeperClientWatcher extends Watcher {
+ // package level visibility for testing only
+ private[zookeeper] object ZooKeeperClientWatcher extends Watcher {
override def process(event: WatchedEvent): Unit = {
debug("Received event: " + event)
Option(event.getPath) match {
case None =>
+ val state = event.getState
+ stateToMeterMap.get(state).foreach(_.mark())
inLock(isConnectedOrExpiredLock) {
isConnectedOrExpiredCondition.signalAll()
}
- if (event.getState == KeeperState.AuthFailed) {
+ if (state == KeeperState.AuthFailed) {
error("Auth failed.")
- stateChangeHandlers.foreach {case (name, handler) =>
handler.onAuthFailure()}
- } else if (event.getState == KeeperState.Expired) {
+ stateChangeHandlers.values.foreach(_.onAuthFailure())
+ } else if (state == KeeperState.Expired) {
inWriteLock(initializationLock) {
info("Session expired.")
- stateChangeHandlers.foreach {case (name, handler) =>
handler.beforeInitializingSession()}
+ stateChangeHandlers.values.foreach(_.beforeInitializingSession())
initialize()
- stateChangeHandlers.foreach {case (name, handler) =>
handler.afterInitializingSession()}
+ stateChangeHandlers.values.foreach(_.afterInitializingSession())
}
}
case Some(path) =>
diff --git a/core/src/test/scala/integration/kafka/api/MetricsTest.scala
b/core/src/test/scala/integration/kafka/api/MetricsTest.scala
index 4596887..167b6a6 100644
--- a/core/src/test/scala/integration/kafka/api/MetricsTest.scala
+++ b/core/src/test/scala/integration/kafka/api/MetricsTest.scala
@@ -205,15 +205,13 @@ class MetricsTest extends IntegrationTestHarness with
SaslSetup {
}
private def verifyBrokerZkMetrics(server: KafkaServer, topic: String): Unit
= {
- // Latency is rounded to milliseconds, so we may need to retry some
operations to get latency > 0.
- val (_, recorded) = TestUtils.computeUntilTrue({
- servers.head.zkClient.getLeaderForPartition(new TopicPartition(topic, 0))
-
yammerMetricValue("kafka.server:type=ZooKeeperClientMetrics,name=ZooKeeperRequestLatencyMs").asInstanceOf[Double]
- })(latency => latency > 0.0)
- assertTrue("ZooKeeper latency not recorded", recorded)
-
- assertEquals(s"Unexpected ZK state
${server.zkUtils.zkConnection.getZookeeperState}",
- "CONNECTED", yammerMetricValue("SessionState"))
+ // Latency is rounded to milliseconds, so check the count instead.
+ val initialCount =
yammerHistogramCount("kafka.server:type=ZooKeeperClientMetrics,name=ZooKeeperRequestLatencyMs").asInstanceOf[Long]
+ servers.head.zkClient.getLeaderForPartition(new TopicPartition(topic, 0))
+ val newCount =
yammerHistogramCount("kafka.server:type=ZooKeeperClientMetrics,name=ZooKeeperRequestLatencyMs").asInstanceOf[Long]
+ assertTrue("ZooKeeper latency not recorded", newCount > initialCount)
+
+ assertEquals(s"Unexpected ZK state", "CONNECTED",
yammerMetricValue("SessionState"))
}
private def verifyBrokerErrorMetrics(server: KafkaServer): Unit = {
@@ -276,6 +274,16 @@ class MetricsTest extends IntegrationTestHarness with
SaslSetup {
}
}
+ private def yammerHistogramCount(name: String): Any = {
+ val allMetrics = Metrics.defaultRegistry.allMetrics.asScala
+ val (_, metric) = allMetrics.find { case (n, _) =>
n.getMBeanName.endsWith(name) }
+ .getOrElse(fail(s"Unable to find broker metric $name: allMetrics:
${allMetrics.keySet.map(_.getMBeanName)}"))
+ metric match {
+ case m: Histogram => m.count
+ case m => fail(s"Unexpected broker metric of class ${m.getClass}")
+ }
+ }
+
private def verifyYammerMetricRecorded(name: String, verify: Double =>
Boolean = d => d > 0): Double = {
val metricValue = yammerMetricValue(name).asInstanceOf[Double]
assertTrue(s"Broker metric not recorded correctly for $name value
$metricValue", verify(metricValue))
diff --git a/core/src/test/scala/unit/kafka/admin/AdminTest.scala
b/core/src/test/scala/unit/kafka/admin/AdminTest.scala
index 7a88237..fb594d0 100755
--- a/core/src/test/scala/unit/kafka/admin/AdminTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/AdminTest.scala
@@ -445,35 +445,35 @@ class AdminTest extends ZooKeeperTestHarness with Logging
with RackAwareTest {
// create a topic with a few config overrides and check that they are
applied
val maxMessageSize = 1024
val retentionMs = 1000 * 1000
- AdminUtils.createTopic(server.zkUtils, topic, partitions, 1,
makeConfig(maxMessageSize, retentionMs, "0:0,1:0,2:0", "0:1,1:1,2:1"))
+ adminZkClient.createTopic(topic, partitions, 1, makeConfig(maxMessageSize,
retentionMs, "0:0,1:0,2:0", "0:1,1:1,2:1"))
//Standard topic configs will be propagated at topic creation time, but
the quota manager will not have been updated.
checkConfig(maxMessageSize, retentionMs, "0:0,1:0,2:0", "0:1,1:1,2:1",
false)
//Update dynamically and all properties should be applied
- AdminUtils.changeTopicConfig(server.zkUtils, topic,
makeConfig(maxMessageSize, retentionMs, "0:0,1:0,2:0", "0:1,1:1,2:1"))
+ adminZkClient.changeTopicConfig(topic, makeConfig(maxMessageSize,
retentionMs, "0:0,1:0,2:0", "0:1,1:1,2:1"))
checkConfig(maxMessageSize, retentionMs, "0:0,1:0,2:0", "0:1,1:1,2:1",
true)
// now double the config values for the topic and check that it is applied
val newConfig = makeConfig(2 * maxMessageSize, 2 * retentionMs, "*", "*")
- AdminUtils.changeTopicConfig(server.zkUtils, topic, makeConfig(2 *
maxMessageSize, 2 * retentionMs, "*", "*"))
+ adminZkClient.changeTopicConfig(topic, makeConfig(2 * maxMessageSize, 2 *
retentionMs, "*", "*"))
checkConfig(2 * maxMessageSize, 2 * retentionMs, "*", "*",
quotaManagerIsThrottled = true)
// Verify that the same config can be read from ZK
- val configInZk = AdminUtils.fetchEntityConfig(server.zkUtils,
ConfigType.Topic, topic)
+ val configInZk = adminZkClient.fetchEntityConfig(ConfigType.Topic, topic)
assertEquals(newConfig, configInZk)
//Now delete the config
- AdminUtils.changeTopicConfig(server.zkUtils, topic, new Properties)
+ adminZkClient.changeTopicConfig(topic, new Properties)
checkConfig(Defaults.MaxMessageSize, Defaults.RetentionMs, "", "",
quotaManagerIsThrottled = false)
//Add config back
- AdminUtils.changeTopicConfig(server.zkUtils, topic,
makeConfig(maxMessageSize, retentionMs, "0:0,1:0,2:0", "0:1,1:1,2:1"))
+ adminZkClient.changeTopicConfig(topic, makeConfig(maxMessageSize,
retentionMs, "0:0,1:0,2:0", "0:1,1:1,2:1"))
checkConfig(maxMessageSize, retentionMs, "0:0,1:0,2:0", "0:1,1:1,2:1",
quotaManagerIsThrottled = true)
//Now ensure updating to "" removes the throttled replica list also
- AdminUtils.changeTopicConfig(server.zkUtils, topic,
propsWith((LogConfig.FollowerReplicationThrottledReplicasProp, ""),
(LogConfig.LeaderReplicationThrottledReplicasProp, "")))
+ adminZkClient.changeTopicConfig(topic,
propsWith((LogConfig.FollowerReplicationThrottledReplicasProp, ""),
(LogConfig.LeaderReplicationThrottledReplicasProp, "")))
checkConfig(Defaults.MaxMessageSize, Defaults.RetentionMs, "", "",
quotaManagerIsThrottled = false)
}
@@ -494,27 +494,27 @@ class AdminTest extends ZooKeeperTestHarness with Logging
with RackAwareTest {
val limit: Long = 1000000
// Set the limit & check it is applied to the log
- changeBrokerConfig(zkUtils, brokerIds, propsWith(
+ adminZkClient.changeBrokerConfig(brokerIds, propsWith(
(LeaderReplicationThrottledRateProp, limit.toString),
(FollowerReplicationThrottledRateProp, limit.toString)))
checkConfig(limit)
// Now double the config values for the topic and check that it is applied
val newLimit = 2 * limit
- changeBrokerConfig(zkUtils, brokerIds, propsWith(
+ adminZkClient.changeBrokerConfig(brokerIds, propsWith(
(LeaderReplicationThrottledRateProp, newLimit.toString),
(FollowerReplicationThrottledRateProp, newLimit.toString)))
checkConfig(newLimit)
// Verify that the same config can be read from ZK
for (brokerId <- brokerIds) {
- val configInZk = AdminUtils.fetchEntityConfig(servers(brokerId).zkUtils,
ConfigType.Broker, brokerId.toString)
+ val configInZk = adminZkClient.fetchEntityConfig(ConfigType.Broker,
brokerId.toString)
assertEquals(newLimit,
configInZk.getProperty(LeaderReplicationThrottledRateProp).toInt)
assertEquals(newLimit,
configInZk.getProperty(FollowerReplicationThrottledRateProp).toInt)
}
//Now delete the config
- changeBrokerConfig(servers(0).zkUtils, brokerIds, new Properties)
+ adminZkClient.changeBrokerConfig(brokerIds, new Properties)
checkConfig(DefaultReplicationThrottledRate)
}
diff --git a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
index bcddd40..135f7f1 100755
--- a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
@@ -23,6 +23,7 @@ import kafka.utils.TestUtils._
import kafka.api.FetchRequestBuilder
import kafka.message.ByteBufferMessageSet
import java.io.File
+import java.net.UnknownHostException
import kafka.log.LogManager
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
@@ -31,6 +32,7 @@ import
org.apache.kafka.common.serialization.{IntegerSerializer, StringSerialize
import org.I0Itec.zkclient.exception.ZkException
import org.junit.{Before, Test}
import org.junit.Assert._
+
import scala.reflect.ClassTag
class ServerShutdownTest extends ZooKeeperTestHarness {
@@ -130,7 +132,7 @@ class ServerShutdownTest extends ZooKeeperTestHarness {
val newProps = TestUtils.createBrokerConfig(0, zkConnect)
newProps.setProperty("zookeeper.connect", "fakehostthatwontresolve:65535")
val newConfig = KafkaConfig.fromProps(newProps)
- verifyCleanShutdownAfterFailedStartup[ZkException](newConfig)
+ verifyCleanShutdownAfterFailedStartup[UnknownHostException](newConfig)
}
@Test
diff --git a/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala
b/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala
index eedf5f3..ff9178a 100755
--- a/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala
@@ -17,8 +17,6 @@
package kafka.server
-import java.net.BindException
-
import kafka.common.KafkaException
import kafka.utils.{TestUtils, ZkUtils}
import kafka.zk.ZooKeeperTestHarness
diff --git
a/core/src/test/scala/unit/kafka/server/SessionExpireListenerTest.scala
b/core/src/test/scala/unit/kafka/server/SessionExpireListenerTest.scala
deleted file mode 100644
index fda17c0..0000000
--- a/core/src/test/scala/unit/kafka/server/SessionExpireListenerTest.scala
+++ /dev/null
@@ -1,73 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.server
-
-import kafka.api.ApiVersion
-import kafka.utils.ZkUtils
-import org.I0Itec.zkclient.ZkClient
-import org.apache.zookeeper.Watcher
-import org.easymock.EasyMock
-import org.junit.{Assert, Test}
-import Assert._
-import com.yammer.metrics.Metrics
-import com.yammer.metrics.core.Meter
-import scala.collection.JavaConverters._
-
-class SessionExpireListenerTest {
-
- private val brokerId = 1
-
- private def cleanMetricsRegistry() {
- val metrics = Metrics.defaultRegistry
- metrics.allMetrics.keySet.asScala.foreach(metrics.removeMetric)
- }
-
- @Test
- def testSessionExpireListenerMetrics() {
-
- cleanMetricsRegistry()
-
- val metrics = Metrics.defaultRegistry
-
- def checkMeterCount(name: String, expected: Long) {
- val meter = metrics.allMetrics.asScala.collectFirst {
- case (metricName, meter: Meter) if metricName.getName == name => meter
- }.getOrElse(sys.error(s"Unable to find meter with name $name"))
- assertEquals(s"Unexpected meter count for $name", expected, meter.count)
- }
-
- val zkClient = EasyMock.mock(classOf[ZkClient])
- val zkUtils = ZkUtils(zkClient, isZkSecurityEnabled = false)
- import Watcher._
- val healthcheck = new KafkaHealthcheck(brokerId, Seq.empty, zkUtils, None,
ApiVersion.latestVersion)
-
- val expiresPerSecName = "ZooKeeperExpiresPerSec"
- val disconnectsPerSecName = "ZooKeeperDisconnectsPerSec"
- checkMeterCount(expiresPerSecName, 0)
- checkMeterCount(disconnectsPerSecName, 0)
-
-
healthcheck.sessionExpireListener.handleStateChanged(Event.KeeperState.Expired)
- checkMeterCount(expiresPerSecName, 1)
- checkMeterCount(disconnectsPerSecName, 0)
-
-
healthcheck.sessionExpireListener.handleStateChanged(Event.KeeperState.Disconnected)
- checkMeterCount(expiresPerSecName, 1)
- checkMeterCount(disconnectsPerSecName, 1)
- }
-
-}
diff --git a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
index ecaa943..f0d6cf0 100644
--- a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
+++ b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
@@ -19,13 +19,17 @@ package kafka.zk
import java.util.{Properties, UUID}
import java.nio.charset.StandardCharsets.UTF_8
+import kafka.api.ApiVersion
+import kafka.cluster.EndPoint
import kafka.log.LogConfig
import kafka.security.auth._
import kafka.server.ConfigType
+import kafka.utils.CoreUtils
import org.apache.kafka.common.TopicPartition
-import org.apache.kafka.common.security.auth.KafkaPrincipal
+import org.apache.kafka.common.network.ListenerName
+import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
import org.apache.zookeeper.KeeperException.NodeExistsException
-import org.junit.Assert.{assertEquals, assertFalse, assertTrue}
+import org.junit.Assert._
import org.junit.Test
class KafkaZkClientTest extends ZooKeeperTestHarness {
@@ -412,6 +416,44 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
}
@Test
+ def testBrokerRegistrationMethods() {
+ zkClient.createTopLevelPaths()
+
+ val brokerInfo = new BrokerInfo(1, "test.host", 9999,
+ Seq(new EndPoint("test.host", 9999,
ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT),
SecurityProtocol.PLAINTEXT)),
+ 9998, None, ApiVersion.latestVersion)
+
+ zkClient.registerBrokerInZk(brokerInfo)
+ val broker = zkClient.getBroker(1).getOrElse(fail("Unregistered broker"))
+
+ assertEquals(brokerInfo.id, broker.id)
+ assertEquals(brokerInfo.endpoints(), broker.endPoints.mkString(","))
+ }
+
+ @Test
+ def testClusterIdMethods() {
+ val clusterId = CoreUtils.generateUuidAsBase64
+
+ zkClient.createOrGetClusterId(clusterId)
+ assertEquals(clusterId, zkClient.getClusterId.getOrElse(fail("No cluster
id found")))
+ }
+
+ @Test
+ def testBrokerSequenceIdMethods() {
+ val sequenceId = zkClient.generateBrokerSequenceId()
+ assertEquals(sequenceId + 1, zkClient.generateBrokerSequenceId)
+ }
+
+ @Test
+ def testCreateTopLevelPaths() {
+ zkClient.createTopLevelPaths()
+
+ ZkData.PersistentZkPaths.foreach {
+ path => assertTrue(zkClient.pathExists(path))
+ }
+ }
+
+ @Test
def testPreferredReplicaElectionMethods() {
assertTrue(zkClient.getPreferredReplicaElection.isEmpty)
diff --git a/core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala
b/core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala
index 75842f0..141dcee 100644
--- a/core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala
+++ b/core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala
@@ -23,18 +23,29 @@ import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.{ArrayBlockingQueue, CountDownLatch, TimeUnit}
import javax.security.auth.login.Configuration
+import com.yammer.metrics.Metrics
+import com.yammer.metrics.core.Meter
import kafka.zk.ZooKeeperTestHarness
import org.apache.kafka.common.security.JaasUtils
-import org.apache.zookeeper.KeeperException.{Code, NoNodeException}
import org.apache.kafka.common.utils.Time
-import org.apache.zookeeper.{CreateMode, ZooDefs}
+import org.apache.zookeeper.KeeperException.{Code, NoNodeException}
+import org.apache.zookeeper.Watcher.Event.{EventType, KeeperState}
+import org.apache.zookeeper.{CreateMode, WatchedEvent, ZooDefs}
import org.junit.Assert.{assertArrayEquals, assertEquals, assertTrue}
-import org.junit.{After, Test}
+import org.junit.{After, Before, Test}
+
+import scala.collection.JavaConverters._
class ZooKeeperClientTest extends ZooKeeperTestHarness {
private val mockPath = "/foo"
private val time = Time.SYSTEM
+ @Before
+ override def setUp() {
+ cleanMetricsRegistry()
+ super.setUp()
+ }
+
@After
override def tearDown() {
super.tearDown()
@@ -360,5 +371,35 @@ class ZooKeeperClientTest extends ZooKeeperTestHarness {
}
}
+ @Test
+ def testSessionExpireListenerMetrics() {
+ val metrics = Metrics.defaultRegistry
+
+ def checkMeterCount(name: String, expected: Long) {
+ val meter = metrics.allMetrics.asScala.collectFirst {
+ case (metricName, meter: Meter) if metricName.getName == name => meter
+ }.getOrElse(sys.error(s"Unable to find meter with name $name"))
+ assertEquals(s"Unexpected meter count for $name", expected, meter.count)
+ }
+
+ val expiresPerSecName = "ZooKeeperExpiresPerSec"
+ val disconnectsPerSecName = "ZooKeeperDisconnectsPerSec"
+ checkMeterCount(expiresPerSecName, 0)
+ checkMeterCount(disconnectsPerSecName, 0)
+
+ zooKeeperClient.ZooKeeperClientWatcher.process(new
WatchedEvent(EventType.None, KeeperState.Expired, null))
+ checkMeterCount(expiresPerSecName, 1)
+ checkMeterCount(disconnectsPerSecName, 0)
+
+ zooKeeperClient.ZooKeeperClientWatcher.process(new
WatchedEvent(EventType.None, KeeperState.Disconnected, null))
+ checkMeterCount(expiresPerSecName, 1)
+ checkMeterCount(disconnectsPerSecName, 1)
+ }
+
+ private def cleanMetricsRegistry() {
+ val metrics = Metrics.defaultRegistry
+ metrics.allMetrics.keySet.asScala.foreach(metrics.removeMetric)
+ }
+
private def bytes =
UUID.randomUUID().toString.getBytes(StandardCharsets.UTF_8)
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
index 71758a5..0551fac 100644
---
a/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
@@ -97,7 +97,8 @@ public class InternalTopicIntegrationTest {
CLUSTER.zKConnectString(),
DEFAULT_ZK_SESSION_TIMEOUT_MS,
DEFAULT_ZK_CONNECTION_TIMEOUT_MS,
- Integer.MAX_VALUE, Time.SYSTEM);
+ Integer.MAX_VALUE, Time.SYSTEM,
+ "testMetricGroup", "testMetricType");
final KafkaZkClient kafkaZkClient = new KafkaZkClient(zkClient, false,
Time.SYSTEM);
try {
final AdminZkClient adminZkClient = new
AdminZkClient(kafkaZkClient);
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/utils/KafkaEmbedded.java
b/streams/src/test/java/org/apache/kafka/streams/integration/utils/KafkaEmbedded.java
index e277d82..275d580 100644
---
a/streams/src/test/java/org/apache/kafka/streams/integration/utils/KafkaEmbedded.java
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/utils/KafkaEmbedded.java
@@ -177,7 +177,9 @@ public class KafkaEmbedded {
DEFAULT_ZK_SESSION_TIMEOUT_MS,
DEFAULT_ZK_CONNECTION_TIMEOUT_MS,
Integer.MAX_VALUE,
- Time.SYSTEM);
+ Time.SYSTEM,
+ "testMetricGroup",
+ "testMetricType");
final KafkaZkClient kafkaZkClient = new KafkaZkClient(zkClient, false,
Time.SYSTEM);
final AdminZkClient adminZkClient = new AdminZkClient(kafkaZkClient);
adminZkClient.createTopic(topic, partitions, replication, topicConfig,
RackAwareMode.Enforced$.MODULE$);
@@ -192,7 +194,9 @@ public class KafkaEmbedded {
DEFAULT_ZK_SESSION_TIMEOUT_MS,
DEFAULT_ZK_CONNECTION_TIMEOUT_MS,
Integer.MAX_VALUE,
- Time.SYSTEM);
+ Time.SYSTEM,
+ "testMetricGroup",
+ "testMetricType");
final KafkaZkClient kafkaZkClient = new KafkaZkClient(zkClient, false,
Time.SYSTEM);
final AdminZkClient adminZkClient = new AdminZkClient(kafkaZkClient);
adminZkClient.deleteTopic(topic);
--
To stop receiving notification emails like this one, please contact
['"[email protected]" <[email protected]>'].