[
https://issues.apache.org/jira/browse/KAFKA-6320?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16301235#comment-16301235
]
ASF GitHub Bot commented on KAFKA-6320:
---------------------------------------
ijuma closed pull request #4351: KAFKA-6320: move ZK metrics in
KafkaHealthCheck to ZookeeperClient
URL: https://github.com/apache/kafka/pull/4351
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala
b/core/src/main/scala/kafka/controller/KafkaController.scala
index f2728517b1f..ca8422e9d40 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -1416,13 +1416,7 @@ class KafkaController(val config: KafkaConfig, zkClient:
KafkaZkClient, time: Ti
override def process(): Unit = {
zkClient.registerBrokerInZk(brokerInfo)
- val wasActiveBeforeChange = isActive
-
zkClient.registerZNodeChangeHandlerAndCheckExistence(controllerChangeHandler)
- activeControllerId = zkClient.getControllerId.getOrElse(-1)
- if (wasActiveBeforeChange && !isActive) {
- onControllerResignation()
- }
- elect()
+ Reelect.process()
}
}
diff --git
a/core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala
b/core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala
index 716e3d1a7fc..9c815bc0589 100644
--- a/core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala
+++ b/core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala
@@ -20,7 +20,7 @@ import java.nio.charset.StandardCharsets
import kafka.common.KafkaException
import kafka.utils.{Json, Logging, ZkUtils}
-import kafka.zk.KafkaZkClient
+import kafka.zk.{KafkaZkClient, ProducerIdBlockZNode}
import scala.collection.JavaConverters._
@@ -87,7 +87,7 @@ class ProducerIdManager(val brokerId: Int, val zkClient:
KafkaZkClient) extends
var zkWriteComplete = false
while (!zkWriteComplete) {
// refresh current producerId block from zookeeper again
- val (dataOpt, zkVersion) =
zkClient.getDataAndVersion(ZkUtils.ProducerIdBlockPath)
+ val (dataOpt, zkVersion) =
zkClient.getDataAndVersion(ProducerIdBlockZNode.path)
// generate the new producerId block
currentProducerIdBlock = dataOpt match {
@@ -110,7 +110,7 @@ class ProducerIdManager(val brokerId: Int, val zkClient:
KafkaZkClient) extends
val newProducerIdBlockData =
ProducerIdManager.generateProducerIdBlockJson(currentProducerIdBlock)
// try to write the new producerId block into zookeeper
- val (succeeded, version) =
zkClient.conditionalUpdatePath(ZkUtils.ProducerIdBlockPath,
+ val (succeeded, version) =
zkClient.conditionalUpdatePath(ProducerIdBlockZNode.path,
newProducerIdBlockData, zkVersion, Some(checkProducerIdBlockZkData))
zkWriteComplete = succeeded
@@ -122,7 +122,7 @@ class ProducerIdManager(val brokerId: Int, val zkClient:
KafkaZkClient) extends
private def checkProducerIdBlockZkData(zkClient: KafkaZkClient, path:
String, expectedData: Array[Byte]): (Boolean, Int) = {
try {
val expectedPidBlock =
ProducerIdManager.parseProducerIdBlockData(expectedData)
- zkClient.getDataAndVersion(ZkUtils.ProducerIdBlockPath) match {
+ zkClient.getDataAndVersion(ProducerIdBlockZNode.path) match {
case (Some(data), zkVersion) =>
val currProducerIdBLock =
ProducerIdManager.parseProducerIdBlockData(data)
(currProducerIdBLock == expectedPidBlock, zkVersion)
diff --git a/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
b/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
index 80d85a0d2c6..74bc80938d4 100644
--- a/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
+++ b/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
@@ -92,7 +92,8 @@ class SimpleAclAuthorizer extends Authorizer with Logging {
val zkMaxInFlightRequests =
configs.get(SimpleAclAuthorizer.ZkMaxInFlightRequests).map(_.toString.toInt).getOrElse(kafkaConfig.zkMaxInFlightRequests)
val time = Time.SYSTEM
- val zooKeeperClient = new ZooKeeperClient(zkUrl, zkSessionTimeOutMs,
zkConnectionTimeoutMs, zkMaxInFlightRequests, time)
+ val zooKeeperClient = new ZooKeeperClient(zkUrl, zkSessionTimeOutMs,
zkConnectionTimeoutMs, zkMaxInFlightRequests,
+ time, "kafka.security", "SimpleAclAuthorizer")
zkClient = new KafkaZkClient(zooKeeperClient,
kafkaConfig.zkEnableSecureAcls, time)
zkClient.createAclPaths()
diff --git a/core/src/main/scala/kafka/server/KafkaHealthcheck.scala
b/core/src/main/scala/kafka/server/KafkaHealthcheck.scala
deleted file mode 100644
index 2ad816868f6..00000000000
--- a/core/src/main/scala/kafka/server/KafkaHealthcheck.scala
+++ /dev/null
@@ -1,113 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.server
-
-import java.net.InetAddress
-import java.util.Locale
-import java.util.concurrent.TimeUnit
-
-import kafka.api.ApiVersion
-import kafka.cluster.EndPoint
-import kafka.metrics.KafkaMetricsGroup
-import kafka.utils._
-import com.yammer.metrics.core.Gauge
-import org.I0Itec.zkclient.IZkStateListener
-import org.apache.kafka.common.security.auth.SecurityProtocol
-import org.apache.zookeeper.Watcher.Event.KeeperState
-
-import scala.collection.mutable.Set
-
-/**
- * This class registers the broker in zookeeper to allow
- * other brokers and consumers to detect failures. It uses an ephemeral znode
with the path:
- * /brokers/ids/[0...N] --> advertisedHost:advertisedPort
- *
- * Right now our definition of health is fairly naive. If we register in zk
we are healthy, otherwise
- * we are dead.
- */
-class KafkaHealthcheck(brokerId: Int,
- advertisedEndpoints: Seq[EndPoint],
- zkUtils: ZkUtils,
- rack: Option[String],
- interBrokerProtocolVersion: ApiVersion) extends Logging
{
-
- private[server] val sessionExpireListener = new SessionExpireListener
-
- def startup() {
- zkUtils.subscribeStateChanges(sessionExpireListener)
- // registration is done in KafkaServer now
- }
-
- def shutdown(): Unit = sessionExpireListener.shutdown()
-
- /**
- * When we get a SessionExpired event, it means that we have lost all
ephemeral nodes and ZKClient has re-established
- * a connection for us. We need to re-register this broker in the broker
registry. We rely on `handleStateChanged`
- * to record ZooKeeper connection state metrics.
- */
- class SessionExpireListener extends IZkStateListener with KafkaMetricsGroup {
-
- private val metricNames = Set[String]()
-
- private[server] val stateToMeterMap = {
- import KeeperState._
- val stateToEventTypeMap = Map(
- Disconnected -> "Disconnects",
- SyncConnected -> "SyncConnects",
- AuthFailed -> "AuthFailures",
- ConnectedReadOnly -> "ReadOnlyConnects",
- SaslAuthenticated -> "SaslAuthentications",
- Expired -> "Expires"
- )
- stateToEventTypeMap.map { case (state, eventType) =>
- val name = s"ZooKeeper${eventType}PerSec"
- metricNames += name
- state -> newMeter(name, eventType.toLowerCase(Locale.ROOT),
TimeUnit.SECONDS)
- }
- }
-
- private[server] val sessionStateGauge =
- newGauge("SessionState", new Gauge[String] {
- override def value: String =
-
Option(zkUtils.zkConnection.getZookeeperState.toString).getOrElse("DISCONNECTED")
- })
-
- metricNames += "SessionState"
-
- @throws[Exception]
- override def handleStateChanged(state: KeeperState) {
- stateToMeterMap.get(state).foreach(_.mark())
- }
-
- @throws[Exception]
- override def handleNewSession() {
- //info("re-registering broker info in ZK for broker " + brokerId)
- //register()
- //info("done re-registering broker")
- //info("Subscribing to %s path to watch for new
topics".format(ZkUtils.BrokerTopicsPath))
- }
-
- override def handleSessionEstablishmentError(err: Throwable) {
- error("Could not establish session with zookeeper", err)
- }
-
- def shutdown(): Unit = metricNames.foreach(removeMetric(_))
-
- }
-
-}
\ No newline at end of file
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala
b/core/src/main/scala/kafka/server/KafkaServer.scala
index 5bd3f8eec6a..86432337017 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -26,7 +26,7 @@ import java.util.concurrent.atomic.{AtomicBoolean,
AtomicInteger}
import com.yammer.metrics.core.Gauge
import kafka.api.KAFKA_0_9_0
import kafka.cluster.{Broker, EndPoint}
-import kafka.common.{GenerateBrokerIdException, InconsistentBrokerIdException}
+import kafka.common.{GenerateBrokerIdException, InconsistentBrokerIdException,
KafkaException}
import kafka.controller.KafkaController
import kafka.coordinator.group.GroupCoordinator
import kafka.coordinator.transaction.TransactionCoordinator
@@ -133,11 +133,9 @@ class KafkaServer(val config: KafkaConfig, time: Time =
Time.SYSTEM, threadNameP
val kafkaScheduler = new KafkaScheduler(config.backgroundThreads)
- var kafkaHealthcheck: KafkaHealthcheck = null
var metadataCache: MetadataCache = null
var quotaManagers: QuotaFactory.QuotaManagers = null
- var zkUtils: ZkUtils = null
private var _zkClient: KafkaZkClient = null
val correlationId: AtomicInteger = new AtomicInteger(0)
val brokerMetaPropsFile = "meta.properties"
@@ -199,10 +197,10 @@ class KafkaServer(val config: KafkaConfig, time: Time =
Time.SYSTEM, threadNameP
kafkaScheduler.startup()
/* setup zookeeper */
- zkUtils = initZk()
+ initZkClient(time)
/* Get or create cluster_id */
- _clusterId = getOrGenerateClusterId(zkUtils)
+ _clusterId = getOrGenerateClusterId(zkClient)
info(s"Cluster ID = $clusterId")
/* generate brokerId */
@@ -226,10 +224,6 @@ class KafkaServer(val config: KafkaConfig, time: Time =
Time.SYSTEM, threadNameP
logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size)
- val zooKeeperClient = new ZooKeeperClient(config.zkConnect,
config.zkSessionTimeoutMs,
- config.zkConnectionTimeoutMs, config.zkMaxInFlightRequests, time)
- _zkClient = new KafkaZkClient(zooKeeperClient, zkUtils.isSecure, time)
-
/* start log manager */
logManager = LogManager(config, initialOfflineDirs, zkClient,
brokerState, kafkaScheduler, time, brokerTopicStats, logDirFailureChannel)
logManager.startup()
@@ -252,12 +246,6 @@ class KafkaServer(val config: KafkaConfig, time: Time =
Time.SYSTEM, threadNameP
endpoint
}
- // to be cleaned up in KAFKA-6320
- kafkaHealthcheck = new KafkaHealthcheck(config.brokerId, listeners,
zkUtils, config.rack,
- config.interBrokerProtocolVersion)
- kafkaHealthcheck.startup()
- // KAFKA-6320
-
val updatedEndpoints = listeners.map(endpoint =>
if (endpoint.host == null || endpoint.host.trim.isEmpty)
endpoint.copy(host = InetAddress.getLocalHost.getCanonicalHostName)
@@ -277,7 +265,7 @@ class KafkaServer(val config: KafkaConfig, time: Time =
Time.SYSTEM, threadNameP
updatedEndpoints, jmxPort, config.rack,
config.interBrokerProtocolVersion)
zkClient.registerBrokerInZk(brokerInfo)
- // Now that the broker id is successfully registered via
KafkaHealthcheck, checkpoint it
+ // Now that the broker id is successfully registered, checkpoint it
checkpointBrokerId(config.brokerId)
/* start kafka controller */
@@ -351,7 +339,7 @@ class KafkaServer(val config: KafkaConfig, time: Time =
Time.SYSTEM, threadNameP
new ReplicaManager(config, metrics, time, zkClient, kafkaScheduler,
logManager, isShuttingDown, quotaManagers,
brokerTopicStats, metadataCache, logDirFailureChannel)
- private def initZk(): ZkUtils = {
+ private def initZkClient(time: Time): Unit = {
info(s"Connecting to zookeeper on ${config.zkConnect}")
val chrootIndex = config.zkConnect.indexOf("/")
@@ -366,29 +354,25 @@ class KafkaServer(val config: KafkaConfig, time: Time =
Time.SYSTEM, threadNameP
if (secureAclsEnabled && !isZkSecurityEnabled)
throw new
java.lang.SecurityException(s"${KafkaConfig.ZkEnableSecureAclsProp} is true,
but the verification of the JAAS login file failed.")
+ // make sure chroot path exists
chrootOption.foreach { chroot =>
val zkConnForChrootCreation = config.zkConnect.substring(0, chrootIndex)
- val zkClientForChrootCreation =
ZkUtils.withMetrics(zkConnForChrootCreation,
- sessionTimeout =
config.zkSessionTimeoutMs,
- connectionTimeout =
config.zkConnectionTimeoutMs,
- secureAclsEnabled,
- time)
- zkClientForChrootCreation.makeSurePersistentPathExists(chroot)
+ val zooKeeperClient = new ZooKeeperClient(zkConnForChrootCreation,
config.zkSessionTimeoutMs,
+ config.zkConnectionTimeoutMs, config.zkMaxInFlightRequests, time)
+ val zkClient = new KafkaZkClient(zooKeeperClient, secureAclsEnabled,
time)
+ zkClient.makeSurePersistentPathExists(chroot)
info(s"Created zookeeper path $chroot")
- zkClientForChrootCreation.close()
+ zkClient.close()
}
- val zkUtils = ZkUtils.withMetrics(config.zkConnect,
- sessionTimeout = config.zkSessionTimeoutMs,
- connectionTimeout = config.zkConnectionTimeoutMs,
- secureAclsEnabled,
- time)
- zkUtils.setupCommonPaths()
- zkUtils
+ val zooKeeperClient = new ZooKeeperClient(config.zkConnect,
config.zkSessionTimeoutMs,
+ config.zkConnectionTimeoutMs, config.zkMaxInFlightRequests, time)
+ _zkClient = new KafkaZkClient(zooKeeperClient, secureAclsEnabled, time)
+ _zkClient.createTopLevelPaths()
}
- def getOrGenerateClusterId(zkUtils: ZkUtils): String = {
-
zkUtils.getClusterId.getOrElse(zkUtils.createOrGetClusterId(CoreUtils.generateUuidAsBase64))
+ def getOrGenerateClusterId(zkClient: KafkaZkClient): String = {
+
zkClient.getClusterId.getOrElse(zkClient.createOrGetClusterId(CoreUtils.generateUuidAsBase64))
}
/**
@@ -455,9 +439,9 @@ class KafkaServer(val config: KafkaConfig, time: Time =
Time.SYSTEM, threadNameP
// Get the current controller info. This is to ensure we use the
most recent info to issue the
// controlled shutdown request
- val controllerId = zkUtils.getController()
+ val controllerId = zkClient.getControllerId.getOrElse(throw new
KafkaException("Controller doesn't exist"))
//If this method returns None ignore and try again
- zkUtils.getBrokerInfo(controllerId).foreach { broker =>
+ zkClient.getBroker(controllerId).foreach { broker =>
// if this is the first attempt, if the controller has changed or
if an exception was thrown in a previous
// attempt, connect to the most recent controller
if (ioException || broker != prevController) {
@@ -549,9 +533,6 @@ class KafkaServer(val config: KafkaConfig, time: Time =
Time.SYSTEM, threadNameP
CoreUtils.swallow(controlledShutdown(), this)
brokerState.newState(BrokerShuttingDown)
- if (kafkaHealthcheck != null)
- CoreUtils.swallow(kafkaHealthcheck.shutdown(), this)
-
if (dynamicConfigManager != null)
CoreUtils.swallow(dynamicConfigManager.shutdown(), this)
@@ -582,8 +563,7 @@ class KafkaServer(val config: KafkaConfig, time: Time =
Time.SYSTEM, threadNameP
if (kafkaController != null)
CoreUtils.swallow(kafkaController.shutdown(), this)
- if (zkUtils != null)
- CoreUtils.swallow(zkUtils.close(), this)
+
if (zkClient != null)
CoreUtils.swallow(zkClient.close(), this)
@@ -689,9 +669,14 @@ class KafkaServer(val config: KafkaConfig, time: Time =
Time.SYSTEM, threadNameP
}
}
+ /**
+ * Return a sequence id generated by updating the broker sequence id path
in ZK.
+ * Users can provide brokerId in the config. To avoid conflicts between ZK
generated
+ * sequence id and configured brokerId, we increment the generated sequence
id by KafkaConfig.MaxReservedBrokerId.
+ */
private def generateBrokerId: Int = {
try {
- zkUtils.getBrokerSequenceId(config.maxReservedBrokerId)
+ zkClient.generateBrokerSequenceId() + config.maxReservedBrokerId
} catch {
case e: Exception =>
error("Failed to generate broker.id due to ", e)
diff --git a/core/src/main/scala/kafka/zk/KafkaZkClient.scala
b/core/src/main/scala/kafka/zk/KafkaZkClient.scala
index 13fd024665f..8c3f01860aa 100644
--- a/core/src/main/scala/kafka/zk/KafkaZkClient.scala
+++ b/core/src/main/scala/kafka/zk/KafkaZkClient.scala
@@ -18,10 +18,10 @@ package kafka.zk
import java.util.Properties
-
import com.yammer.metrics.core.MetricName
import kafka.api.LeaderAndIsr
import kafka.cluster.Broker
+import kafka.common.KafkaException
import kafka.controller.LeaderIsrAndControllerEpoch
import kafka.log.LogConfig
import kafka.metrics.KafkaMetricsGroup
@@ -32,7 +32,7 @@ import kafka.utils._
import kafka.zookeeper._
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.utils.Time
-import org.apache.zookeeper.KeeperException.Code
+import org.apache.zookeeper.KeeperException.{Code, NodeExistsException}
import org.apache.zookeeper.data.{ACL, Stat}
import org.apache.zookeeper.{CreateMode, KeeperException}
@@ -299,6 +299,20 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient,
isSecure: Boolean, time: T
}
}
+ /**
+ * Get a broker from ZK
+ * @return an optional Broker
+ */
+ def getBroker(brokerId: Int): Option[Broker] = {
+ val getDataRequest = GetDataRequest(BrokerIdZNode.path(brokerId))
+ val getDataResponse = retryRequestUntilConnected(getDataRequest)
+ getDataResponse.resultCode match {
+ case Code.OK =>
+ Option(BrokerIdZNode.decode(brokerId, getDataResponse.data))
+ case Code.NONODE => None
+ case _ => throw getDataResponse.resultException.get
+ }
+ }
/**
* Gets the list of sorted broker Ids
@@ -1174,6 +1188,67 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient,
isSecure: Boolean, time: T
}
}
+ /**
+ * Get the cluster id.
+ * @return optional cluster id in String.
+ */
+ def getClusterId: Option[String] = {
+ val getDataRequest = GetDataRequest(ClusterIdZNode.path)
+ val getDataResponse = retryRequestUntilConnected(getDataRequest)
+ getDataResponse.resultCode match {
+ case Code.OK => Some(ClusterIdZNode.fromJson(getDataResponse.data))
+ case Code.NONODE => None
+ case _ => throw getDataResponse.resultException.get
+ }
+ }
+
+ /**
+ * Create the cluster Id. If the cluster id already exists, return the
current cluster id.
+ * @return cluster id
+ */
+ def createOrGetClusterId(proposedClusterId: String): String = {
+ try {
+ createRecursive(ClusterIdZNode.path,
ClusterIdZNode.toJson(proposedClusterId))
+ proposedClusterId
+ } catch {
+ case e: NodeExistsException => getClusterId.getOrElse(
+ throw new KafkaException("Failed to get cluster id from Zookeeper.
This can happen if /cluster/id is deleted from Zookeeper."))
+ }
+ }
+
+ /**
+ * Generate a borker id by updating the broker sequence id path in ZK and
return the version of the path.
+ * The version is incremented by one on every update starting from 1.
+ * @return sequence number as the broker id
+ */
+ def generateBrokerSequenceId(): Int = {
+ val setDataRequest = SetDataRequest(BrokerSequenceIdZNode.path,
Array.empty[Byte], -1)
+ val setDataResponse = retryRequestUntilConnected(setDataRequest)
+ setDataResponse.resultCode match {
+ case Code.OK => setDataResponse.stat.getVersion
+ case Code.NONODE =>
+ // maker sure the path exists
+ createRecursive(BrokerSequenceIdZNode.path, Array.empty[Byte],
throwIfPathExists = false)
+ generateBrokerSequenceId()
+ case _ => throw setDataResponse.resultException.get
+ }
+ }
+
+ /**
+ * Pre-create top level paths in ZK if needed.
+ */
+ def createTopLevelPaths(): Unit = {
+ ZkData.PersistentZkPaths.foreach(makeSurePersistentPathExists(_))
+ }
+
+ /**
+ * Make sure a persistent path exists in ZK.
+ * @param path
+ */
+ def makeSurePersistentPathExists(path: String): Unit = {
+ createRecursive(path, data = null, throwIfPathExists = false)
+ }
+
private def setConsumerOffset(group: String, topicPartition: TopicPartition,
offset: Long): SetDataResponse = {
val setDataRequest = SetDataRequest(ConsumerOffset.path(group,
topicPartition.topic, topicPartition.partition),
ConsumerOffset.encode(offset), ZkVersion.NoVersion)
@@ -1246,7 +1321,8 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient,
isSecure: Boolean, time: T
} else if (createResponse.resultCode == Code.NONODE) {
createRecursive0(parentPath(path))
createResponse = retryRequestUntilConnected(createRequest)
- createResponse.maybeThrow
+ if (throwIfPathExists || createResponse.resultCode != Code.NODEEXISTS)
+ createResponse.maybeThrow
} else if (createResponse.resultCode != Code.NODEEXISTS)
createResponse.maybeThrow
diff --git a/core/src/main/scala/kafka/zk/ZkData.scala
b/core/src/main/scala/kafka/zk/ZkData.scala
index a03263c6963..95781299197 100644
--- a/core/src/main/scala/kafka/zk/ZkData.scala
+++ b/core/src/main/scala/kafka/zk/ZkData.scala
@@ -21,12 +21,15 @@ import java.util.Properties
import kafka.api.{ApiVersion, KAFKA_0_10_0_IV1, LeaderAndIsr}
import kafka.cluster.{Broker, EndPoint}
+import kafka.common.KafkaException
import kafka.controller.{IsrChangeNotificationHandler,
LeaderIsrAndControllerEpoch}
import kafka.security.auth.{Acl, Resource}
import kafka.security.auth.SimpleAclAuthorizer.VersionedAcls
+import kafka.server.ConfigType
import kafka.utils.Json
import org.apache.kafka.common.TopicPartition
import org.apache.zookeeper.data.Stat
+
import scala.collection.JavaConverters._
// This file contains objects for encoding/decoding data stored in ZooKeeper
nodes (znodes).
@@ -338,3 +341,41 @@ object AclChangeNotificationSequenceZNode {
def encode(resourceName : String): Array[Byte] = resourceName.getBytes(UTF_8)
def decode(bytes: Array[Byte]): String = new String(bytes, UTF_8)
}
+
+object ClusterIdZNode {
+ def path = "/cluster/id"
+
+ def toJson(id: String): Array[Byte] = {
+ Json.encodeAsBytes(Map("version" -> "1", "id" -> id).asJava)
+ }
+
+ def fromJson(clusterIdJson: Array[Byte]): String = {
+
Json.parseBytes(clusterIdJson).map(_.asJsonObject("id").to[String]).getOrElse {
+ throw new KafkaException(s"Failed to parse the cluster id json
$clusterIdJson")
+ }
+ }
+}
+
+object BrokerSequenceIdZNode {
+ def path = s"${BrokersZNode.path}/seqid"
+}
+
+object ProducerIdBlockZNode {
+ def path = "/latest_producer_id_block"
+}
+
+object ZkData {
+ // These are persistent ZK paths that should exist on kafka broker startup.
+ val PersistentZkPaths = Seq(
+ "/consumers", // old consumer path
+ BrokerIdsZNode.path,
+ TopicsZNode.path,
+ ConfigEntityChangeNotificationZNode.path,
+ ConfigEntityTypeZNode.path(ConfigType.Topic),
+ ConfigEntityTypeZNode.path(ConfigType.Client),
+ DeleteTopicsZNode.path,
+ BrokerSequenceIdZNode.path,
+ IsrChangeNotificationZNode.path,
+ ProducerIdBlockZNode.path,
+ LogDirEventNotificationZNode.path)
+}
\ No newline at end of file
diff --git a/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
b/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
index a0898dadd06..6d786dcc73c 100644
--- a/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
+++ b/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
@@ -17,9 +17,12 @@
package kafka.zookeeper
+import java.util.Locale
import java.util.concurrent.locks.{ReentrantLock, ReentrantReadWriteLock}
import java.util.concurrent.{ArrayBlockingQueue, ConcurrentHashMap,
CountDownLatch, Semaphore, TimeUnit}
+import com.yammer.metrics.core.{Gauge, MetricName}
+import kafka.metrics.KafkaMetricsGroup
import kafka.utils.CoreUtils.{inLock, inReadLock, inWriteLock}
import kafka.utils.Logging
import org.apache.kafka.common.utils.Time
@@ -31,6 +34,7 @@ import org.apache.zookeeper.data.{ACL, Stat}
import org.apache.zookeeper.{CreateMode, KeeperException, WatchedEvent,
Watcher, ZooKeeper}
import scala.collection.JavaConverters._
+import scala.collection.mutable.Set
/**
* A ZooKeeper client that encourages pipelined requests.
@@ -44,7 +48,9 @@ class ZooKeeperClient(connectString: String,
sessionTimeoutMs: Int,
connectionTimeoutMs: Int,
maxInFlightRequests: Int,
- time: Time) extends Logging {
+ time: Time,
+ metricGroup: String = "kafka.server",
+ metricType: String = "KafkaHealthcheck") extends Logging
with KafkaMetricsGroup {
this.logIdent = "[ZooKeeperClient] "
private val initializationLock = new ReentrantReadWriteLock()
private val isConnectedOrExpiredLock = new ReentrantLock()
@@ -54,10 +60,47 @@ class ZooKeeperClient(connectString: String,
private val inFlightRequests = new Semaphore(maxInFlightRequests)
private val stateChangeHandlers = new ConcurrentHashMap[String,
StateChangeHandler]().asScala
+ private val metricNames = Set[String]()
+
+ // The state map has to be created before creating ZooKeeper since it's
needed in the ZooKeeper callback.
+ private val stateToMeterMap = {
+ import KeeperState._
+ val stateToEventTypeMap = Map(
+ Disconnected -> "Disconnects",
+ SyncConnected -> "SyncConnects",
+ AuthFailed -> "AuthFailures",
+ ConnectedReadOnly -> "ReadOnlyConnects",
+ SaslAuthenticated -> "SaslAuthentications",
+ Expired -> "Expires"
+ )
+ stateToEventTypeMap.map { case (state, eventType) =>
+ val name = s"ZooKeeper${eventType}PerSec"
+ metricNames += name
+ state -> newMeter(name, eventType.toLowerCase(Locale.ROOT),
TimeUnit.SECONDS)
+ }
+ }
+
info(s"Initializing a new session to $connectString.")
@volatile private var zooKeeper = new ZooKeeper(connectString,
sessionTimeoutMs, ZooKeeperClientWatcher)
+
+ private val sessionStateGauge =
+ newGauge("SessionState", new Gauge[String] {
+ override def value: String =
+ Option(zooKeeper.getState.toString).getOrElse("DISCONNECTED")
+ })
+
+ metricNames += "SessionState"
+
waitUntilConnected(connectionTimeoutMs, TimeUnit.MILLISECONDS)
+
+ /**
+ * This is added to preserve the original metric name in JMX
+ */
+ override def metricName(name: String, metricTags:
scala.collection.Map[String, String]): MetricName = {
+ explicitMetricName(metricGroup, metricType, name, metricTags)
+ }
+
/**
* Send a request and wait for its response. See handle(Seq[AsyncRequest])
for details.
*
@@ -259,6 +302,7 @@ class ZooKeeperClient(connectString: String,
zNodeChildChangeHandlers.clear()
stateChangeHandlers.clear()
zooKeeper.close()
+ metricNames.foreach(removeMetric(_))
info("Closed.")
}
@@ -269,25 +313,20 @@ class ZooKeeperClient(connectString: String,
private def initialize(): Unit = {
if (!zooKeeper.getState.isAlive) {
info(s"Initializing a new session to $connectString.")
- var now = System.currentTimeMillis()
- val threshold = now + connectionTimeoutMs
- while (now < threshold) {
+ // retry forever until ZooKeeper can be instantiated
+ while (true) {
try {
zooKeeper.close()
zooKeeper = new ZooKeeper(connectString, sessionTimeoutMs,
ZooKeeperClientWatcher)
- waitUntilConnected(threshold - now, TimeUnit.MILLISECONDS)
return
} catch {
- case _: Exception =>
- now = System.currentTimeMillis()
- if (now < threshold) {
- Thread.sleep(1000)
- now = System.currentTimeMillis()
- }
+ case e: Exception =>
+ info("Error when recreating ZooKeeper", e)
+ Thread.sleep(1000)
}
}
info(s"Timed out waiting for connection during session initialization
while in state: ${zooKeeper.getState}")
- stateChangeHandlers.foreach {case (name, handler) =>
handler.onReconnectionTimeout()}
+ stateChangeHandlers.values.foreach(_.onReconnectionTimeout())
}
}
@@ -299,23 +338,26 @@ class ZooKeeperClient(connectString: String,
initialize()
}
- private object ZooKeeperClientWatcher extends Watcher {
+ // package level visibility for testing only
+ private[zookeeper] object ZooKeeperClientWatcher extends Watcher {
override def process(event: WatchedEvent): Unit = {
debug("Received event: " + event)
Option(event.getPath) match {
case None =>
+ val state = event.getState
+ stateToMeterMap.get(state).foreach(_.mark())
inLock(isConnectedOrExpiredLock) {
isConnectedOrExpiredCondition.signalAll()
}
- if (event.getState == KeeperState.AuthFailed) {
+ if (state == KeeperState.AuthFailed) {
error("Auth failed.")
- stateChangeHandlers.foreach {case (name, handler) =>
handler.onAuthFailure()}
- } else if (event.getState == KeeperState.Expired) {
+ stateChangeHandlers.values.foreach(_.onAuthFailure())
+ } else if (state == KeeperState.Expired) {
inWriteLock(initializationLock) {
info("Session expired.")
- stateChangeHandlers.foreach {case (name, handler) =>
handler.beforeInitializingSession()}
+ stateChangeHandlers.values.foreach(_.beforeInitializingSession())
initialize()
- stateChangeHandlers.foreach {case (name, handler) =>
handler.afterInitializingSession()}
+ stateChangeHandlers.values.foreach(_.afterInitializingSession())
}
}
case Some(path) =>
diff --git a/core/src/test/scala/integration/kafka/api/MetricsTest.scala
b/core/src/test/scala/integration/kafka/api/MetricsTest.scala
index 459688711e9..167b6a6f93f 100644
--- a/core/src/test/scala/integration/kafka/api/MetricsTest.scala
+++ b/core/src/test/scala/integration/kafka/api/MetricsTest.scala
@@ -205,15 +205,13 @@ class MetricsTest extends IntegrationTestHarness with
SaslSetup {
}
private def verifyBrokerZkMetrics(server: KafkaServer, topic: String): Unit
= {
- // Latency is rounded to milliseconds, so we may need to retry some
operations to get latency > 0.
- val (_, recorded) = TestUtils.computeUntilTrue({
- servers.head.zkClient.getLeaderForPartition(new TopicPartition(topic, 0))
-
yammerMetricValue("kafka.server:type=ZooKeeperClientMetrics,name=ZooKeeperRequestLatencyMs").asInstanceOf[Double]
- })(latency => latency > 0.0)
- assertTrue("ZooKeeper latency not recorded", recorded)
-
- assertEquals(s"Unexpected ZK state
${server.zkUtils.zkConnection.getZookeeperState}",
- "CONNECTED", yammerMetricValue("SessionState"))
+ // Latency is rounded to milliseconds, so check the count instead.
+ val initialCount =
yammerHistogramCount("kafka.server:type=ZooKeeperClientMetrics,name=ZooKeeperRequestLatencyMs").asInstanceOf[Long]
+ servers.head.zkClient.getLeaderForPartition(new TopicPartition(topic, 0))
+ val newCount =
yammerHistogramCount("kafka.server:type=ZooKeeperClientMetrics,name=ZooKeeperRequestLatencyMs").asInstanceOf[Long]
+ assertTrue("ZooKeeper latency not recorded", newCount > initialCount)
+
+ assertEquals(s"Unexpected ZK state", "CONNECTED",
yammerMetricValue("SessionState"))
}
private def verifyBrokerErrorMetrics(server: KafkaServer): Unit = {
@@ -276,6 +274,16 @@ class MetricsTest extends IntegrationTestHarness with
SaslSetup {
}
}
+ private def yammerHistogramCount(name: String): Any = {
+ val allMetrics = Metrics.defaultRegistry.allMetrics.asScala
+ val (_, metric) = allMetrics.find { case (n, _) =>
n.getMBeanName.endsWith(name) }
+ .getOrElse(fail(s"Unable to find broker metric $name: allMetrics:
${allMetrics.keySet.map(_.getMBeanName)}"))
+ metric match {
+ case m: Histogram => m.count
+ case m => fail(s"Unexpected broker metric of class ${m.getClass}")
+ }
+ }
+
private def verifyYammerMetricRecorded(name: String, verify: Double =>
Boolean = d => d > 0): Double = {
val metricValue = yammerMetricValue(name).asInstanceOf[Double]
assertTrue(s"Broker metric not recorded correctly for $name value
$metricValue", verify(metricValue))
diff --git a/core/src/test/scala/unit/kafka/admin/AdminTest.scala
b/core/src/test/scala/unit/kafka/admin/AdminTest.scala
index 7a88237cf9f..fb594d0e5ec 100755
--- a/core/src/test/scala/unit/kafka/admin/AdminTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/AdminTest.scala
@@ -445,35 +445,35 @@ class AdminTest extends ZooKeeperTestHarness with Logging
with RackAwareTest {
// create a topic with a few config overrides and check that they are
applied
val maxMessageSize = 1024
val retentionMs = 1000 * 1000
- AdminUtils.createTopic(server.zkUtils, topic, partitions, 1,
makeConfig(maxMessageSize, retentionMs, "0:0,1:0,2:0", "0:1,1:1,2:1"))
+ adminZkClient.createTopic(topic, partitions, 1, makeConfig(maxMessageSize,
retentionMs, "0:0,1:0,2:0", "0:1,1:1,2:1"))
//Standard topic configs will be propagated at topic creation time, but
the quota manager will not have been updated.
checkConfig(maxMessageSize, retentionMs, "0:0,1:0,2:0", "0:1,1:1,2:1",
false)
//Update dynamically and all properties should be applied
- AdminUtils.changeTopicConfig(server.zkUtils, topic,
makeConfig(maxMessageSize, retentionMs, "0:0,1:0,2:0", "0:1,1:1,2:1"))
+ adminZkClient.changeTopicConfig(topic, makeConfig(maxMessageSize,
retentionMs, "0:0,1:0,2:0", "0:1,1:1,2:1"))
checkConfig(maxMessageSize, retentionMs, "0:0,1:0,2:0", "0:1,1:1,2:1",
true)
// now double the config values for the topic and check that it is applied
val newConfig = makeConfig(2 * maxMessageSize, 2 * retentionMs, "*", "*")
- AdminUtils.changeTopicConfig(server.zkUtils, topic, makeConfig(2 *
maxMessageSize, 2 * retentionMs, "*", "*"))
+ adminZkClient.changeTopicConfig(topic, makeConfig(2 * maxMessageSize, 2 *
retentionMs, "*", "*"))
checkConfig(2 * maxMessageSize, 2 * retentionMs, "*", "*",
quotaManagerIsThrottled = true)
// Verify that the same config can be read from ZK
- val configInZk = AdminUtils.fetchEntityConfig(server.zkUtils,
ConfigType.Topic, topic)
+ val configInZk = adminZkClient.fetchEntityConfig(ConfigType.Topic, topic)
assertEquals(newConfig, configInZk)
//Now delete the config
- AdminUtils.changeTopicConfig(server.zkUtils, topic, new Properties)
+ adminZkClient.changeTopicConfig(topic, new Properties)
checkConfig(Defaults.MaxMessageSize, Defaults.RetentionMs, "", "",
quotaManagerIsThrottled = false)
//Add config back
- AdminUtils.changeTopicConfig(server.zkUtils, topic,
makeConfig(maxMessageSize, retentionMs, "0:0,1:0,2:0", "0:1,1:1,2:1"))
+ adminZkClient.changeTopicConfig(topic, makeConfig(maxMessageSize,
retentionMs, "0:0,1:0,2:0", "0:1,1:1,2:1"))
checkConfig(maxMessageSize, retentionMs, "0:0,1:0,2:0", "0:1,1:1,2:1",
quotaManagerIsThrottled = true)
//Now ensure updating to "" removes the throttled replica list also
- AdminUtils.changeTopicConfig(server.zkUtils, topic,
propsWith((LogConfig.FollowerReplicationThrottledReplicasProp, ""),
(LogConfig.LeaderReplicationThrottledReplicasProp, "")))
+ adminZkClient.changeTopicConfig(topic,
propsWith((LogConfig.FollowerReplicationThrottledReplicasProp, ""),
(LogConfig.LeaderReplicationThrottledReplicasProp, "")))
checkConfig(Defaults.MaxMessageSize, Defaults.RetentionMs, "", "",
quotaManagerIsThrottled = false)
}
@@ -494,27 +494,27 @@ class AdminTest extends ZooKeeperTestHarness with Logging
with RackAwareTest {
val limit: Long = 1000000
// Set the limit & check it is applied to the log
- changeBrokerConfig(zkUtils, brokerIds, propsWith(
+ adminZkClient.changeBrokerConfig(brokerIds, propsWith(
(LeaderReplicationThrottledRateProp, limit.toString),
(FollowerReplicationThrottledRateProp, limit.toString)))
checkConfig(limit)
// Now double the config values for the topic and check that it is applied
val newLimit = 2 * limit
- changeBrokerConfig(zkUtils, brokerIds, propsWith(
+ adminZkClient.changeBrokerConfig(brokerIds, propsWith(
(LeaderReplicationThrottledRateProp, newLimit.toString),
(FollowerReplicationThrottledRateProp, newLimit.toString)))
checkConfig(newLimit)
// Verify that the same config can be read from ZK
for (brokerId <- brokerIds) {
- val configInZk = AdminUtils.fetchEntityConfig(servers(brokerId).zkUtils,
ConfigType.Broker, brokerId.toString)
+ val configInZk = adminZkClient.fetchEntityConfig(ConfigType.Broker,
brokerId.toString)
assertEquals(newLimit,
configInZk.getProperty(LeaderReplicationThrottledRateProp).toInt)
assertEquals(newLimit,
configInZk.getProperty(FollowerReplicationThrottledRateProp).toInt)
}
//Now delete the config
- changeBrokerConfig(servers(0).zkUtils, brokerIds, new Properties)
+ adminZkClient.changeBrokerConfig(brokerIds, new Properties)
checkConfig(DefaultReplicationThrottledRate)
}
diff --git a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
index bcddd40315b..135f7f1fd2e 100755
--- a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
@@ -23,6 +23,7 @@ import kafka.utils.TestUtils._
import kafka.api.FetchRequestBuilder
import kafka.message.ByteBufferMessageSet
import java.io.File
+import java.net.UnknownHostException
import kafka.log.LogManager
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
@@ -31,6 +32,7 @@ import
org.apache.kafka.common.serialization.{IntegerSerializer, StringSerialize
import org.I0Itec.zkclient.exception.ZkException
import org.junit.{Before, Test}
import org.junit.Assert._
+
import scala.reflect.ClassTag
class ServerShutdownTest extends ZooKeeperTestHarness {
@@ -130,7 +132,7 @@ class ServerShutdownTest extends ZooKeeperTestHarness {
val newProps = TestUtils.createBrokerConfig(0, zkConnect)
newProps.setProperty("zookeeper.connect", "fakehostthatwontresolve:65535")
val newConfig = KafkaConfig.fromProps(newProps)
- verifyCleanShutdownAfterFailedStartup[ZkException](newConfig)
+ verifyCleanShutdownAfterFailedStartup[UnknownHostException](newConfig)
}
@Test
diff --git a/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala
b/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala
index eedf5f32ba9..ff9178a85cf 100755
--- a/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala
@@ -17,8 +17,6 @@
package kafka.server
-import java.net.BindException
-
import kafka.common.KafkaException
import kafka.utils.{TestUtils, ZkUtils}
import kafka.zk.ZooKeeperTestHarness
diff --git
a/core/src/test/scala/unit/kafka/server/SessionExpireListenerTest.scala
b/core/src/test/scala/unit/kafka/server/SessionExpireListenerTest.scala
deleted file mode 100644
index fda17c0201c..00000000000
--- a/core/src/test/scala/unit/kafka/server/SessionExpireListenerTest.scala
+++ /dev/null
@@ -1,73 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.server
-
-import kafka.api.ApiVersion
-import kafka.utils.ZkUtils
-import org.I0Itec.zkclient.ZkClient
-import org.apache.zookeeper.Watcher
-import org.easymock.EasyMock
-import org.junit.{Assert, Test}
-import Assert._
-import com.yammer.metrics.Metrics
-import com.yammer.metrics.core.Meter
-import scala.collection.JavaConverters._
-
-class SessionExpireListenerTest {
-
- private val brokerId = 1
-
- private def cleanMetricsRegistry() {
- val metrics = Metrics.defaultRegistry
- metrics.allMetrics.keySet.asScala.foreach(metrics.removeMetric)
- }
-
- @Test
- def testSessionExpireListenerMetrics() {
-
- cleanMetricsRegistry()
-
- val metrics = Metrics.defaultRegistry
-
- def checkMeterCount(name: String, expected: Long) {
- val meter = metrics.allMetrics.asScala.collectFirst {
- case (metricName, meter: Meter) if metricName.getName == name => meter
- }.getOrElse(sys.error(s"Unable to find meter with name $name"))
- assertEquals(s"Unexpected meter count for $name", expected, meter.count)
- }
-
- val zkClient = EasyMock.mock(classOf[ZkClient])
- val zkUtils = ZkUtils(zkClient, isZkSecurityEnabled = false)
- import Watcher._
- val healthcheck = new KafkaHealthcheck(brokerId, Seq.empty, zkUtils, None,
ApiVersion.latestVersion)
-
- val expiresPerSecName = "ZooKeeperExpiresPerSec"
- val disconnectsPerSecName = "ZooKeeperDisconnectsPerSec"
- checkMeterCount(expiresPerSecName, 0)
- checkMeterCount(disconnectsPerSecName, 0)
-
-
healthcheck.sessionExpireListener.handleStateChanged(Event.KeeperState.Expired)
- checkMeterCount(expiresPerSecName, 1)
- checkMeterCount(disconnectsPerSecName, 0)
-
-
healthcheck.sessionExpireListener.handleStateChanged(Event.KeeperState.Disconnected)
- checkMeterCount(expiresPerSecName, 1)
- checkMeterCount(disconnectsPerSecName, 1)
- }
-
-}
diff --git a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
index ecaa943d367..f0d6cf02d58 100644
--- a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
+++ b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
@@ -19,13 +19,17 @@ package kafka.zk
import java.util.{Properties, UUID}
import java.nio.charset.StandardCharsets.UTF_8
+import kafka.api.ApiVersion
+import kafka.cluster.EndPoint
import kafka.log.LogConfig
import kafka.security.auth._
import kafka.server.ConfigType
+import kafka.utils.CoreUtils
import org.apache.kafka.common.TopicPartition
-import org.apache.kafka.common.security.auth.KafkaPrincipal
+import org.apache.kafka.common.network.ListenerName
+import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
import org.apache.zookeeper.KeeperException.NodeExistsException
-import org.junit.Assert.{assertEquals, assertFalse, assertTrue}
+import org.junit.Assert._
import org.junit.Test
class KafkaZkClientTest extends ZooKeeperTestHarness {
@@ -411,6 +415,44 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
assertTrue(zkClient.getEntityConfigs(ConfigType.Topic, topic1).isEmpty)
}
+ @Test
+ def testBrokerRegistrationMethods() {
+ zkClient.createTopLevelPaths()
+
+ val brokerInfo = new BrokerInfo(1, "test.host", 9999,
+ Seq(new EndPoint("test.host", 9999,
ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT),
SecurityProtocol.PLAINTEXT)),
+ 9998, None, ApiVersion.latestVersion)
+
+ zkClient.registerBrokerInZk(brokerInfo)
+ val broker = zkClient.getBroker(1).getOrElse(fail("Unregistered broker"))
+
+ assertEquals(brokerInfo.id, broker.id)
+ assertEquals(brokerInfo.endpoints(), broker.endPoints.mkString(","))
+ }
+
+ @Test
+ def testClusterIdMethods() {
+ val clusterId = CoreUtils.generateUuidAsBase64
+
+ zkClient.createOrGetClusterId(clusterId)
+ assertEquals(clusterId, zkClient.getClusterId.getOrElse(fail("No cluster
id found")))
+ }
+
+ @Test
+ def testBrokerSequenceIdMethods() {
+ val sequenceId = zkClient.generateBrokerSequenceId()
+ assertEquals(sequenceId + 1, zkClient.generateBrokerSequenceId)
+ }
+
+ @Test
+ def testCreateTopLevelPaths() {
+ zkClient.createTopLevelPaths()
+
+ ZkData.PersistentZkPaths.foreach {
+ path => assertTrue(zkClient.pathExists(path))
+ }
+ }
+
@Test
def testPreferredReplicaElectionMethods() {
diff --git a/core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala
b/core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala
index 75842f02d04..141dcee8e3a 100644
--- a/core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala
+++ b/core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala
@@ -23,18 +23,29 @@ import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.{ArrayBlockingQueue, CountDownLatch, TimeUnit}
import javax.security.auth.login.Configuration
+import com.yammer.metrics.Metrics
+import com.yammer.metrics.core.Meter
import kafka.zk.ZooKeeperTestHarness
import org.apache.kafka.common.security.JaasUtils
-import org.apache.zookeeper.KeeperException.{Code, NoNodeException}
import org.apache.kafka.common.utils.Time
-import org.apache.zookeeper.{CreateMode, ZooDefs}
+import org.apache.zookeeper.KeeperException.{Code, NoNodeException}
+import org.apache.zookeeper.Watcher.Event.{EventType, KeeperState}
+import org.apache.zookeeper.{CreateMode, WatchedEvent, ZooDefs}
import org.junit.Assert.{assertArrayEquals, assertEquals, assertTrue}
-import org.junit.{After, Test}
+import org.junit.{After, Before, Test}
+
+import scala.collection.JavaConverters._
class ZooKeeperClientTest extends ZooKeeperTestHarness {
private val mockPath = "/foo"
private val time = Time.SYSTEM
+ @Before
+ override def setUp() {
+ cleanMetricsRegistry()
+ super.setUp()
+ }
+
@After
override def tearDown() {
super.tearDown()
@@ -360,5 +371,35 @@ class ZooKeeperClientTest extends ZooKeeperTestHarness {
}
}
+ @Test
+ def testSessionExpireListenerMetrics() {
+ val metrics = Metrics.defaultRegistry
+
+ def checkMeterCount(name: String, expected: Long) {
+ val meter = metrics.allMetrics.asScala.collectFirst {
+ case (metricName, meter: Meter) if metricName.getName == name => meter
+ }.getOrElse(sys.error(s"Unable to find meter with name $name"))
+ assertEquals(s"Unexpected meter count for $name", expected, meter.count)
+ }
+
+ val expiresPerSecName = "ZooKeeperExpiresPerSec"
+ val disconnectsPerSecName = "ZooKeeperDisconnectsPerSec"
+ checkMeterCount(expiresPerSecName, 0)
+ checkMeterCount(disconnectsPerSecName, 0)
+
+ zooKeeperClient.ZooKeeperClientWatcher.process(new
WatchedEvent(EventType.None, KeeperState.Expired, null))
+ checkMeterCount(expiresPerSecName, 1)
+ checkMeterCount(disconnectsPerSecName, 0)
+
+ zooKeeperClient.ZooKeeperClientWatcher.process(new
WatchedEvent(EventType.None, KeeperState.Disconnected, null))
+ checkMeterCount(expiresPerSecName, 1)
+ checkMeterCount(disconnectsPerSecName, 1)
+ }
+
+ private def cleanMetricsRegistry() {
+ val metrics = Metrics.defaultRegistry
+ metrics.allMetrics.keySet.asScala.foreach(metrics.removeMetric)
+ }
+
private def bytes =
UUID.randomUUID().toString.getBytes(StandardCharsets.UTF_8)
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
index 71758a51c05..0551fac169d 100644
---
a/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
@@ -97,7 +97,8 @@ private Properties getTopicConfigProperties(final String
changelog) {
CLUSTER.zKConnectString(),
DEFAULT_ZK_SESSION_TIMEOUT_MS,
DEFAULT_ZK_CONNECTION_TIMEOUT_MS,
- Integer.MAX_VALUE, Time.SYSTEM);
+ Integer.MAX_VALUE, Time.SYSTEM,
+ "testMetricGroup", "testMetricType");
final KafkaZkClient kafkaZkClient = new KafkaZkClient(zkClient, false,
Time.SYSTEM);
try {
final AdminZkClient adminZkClient = new
AdminZkClient(kafkaZkClient);
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/utils/KafkaEmbedded.java
b/streams/src/test/java/org/apache/kafka/streams/integration/utils/KafkaEmbedded.java
index e277d82461b..275d58065dd 100644
---
a/streams/src/test/java/org/apache/kafka/streams/integration/utils/KafkaEmbedded.java
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/utils/KafkaEmbedded.java
@@ -177,7 +177,9 @@ public void createTopic(final String topic,
DEFAULT_ZK_SESSION_TIMEOUT_MS,
DEFAULT_ZK_CONNECTION_TIMEOUT_MS,
Integer.MAX_VALUE,
- Time.SYSTEM);
+ Time.SYSTEM,
+ "testMetricGroup",
+ "testMetricType");
final KafkaZkClient kafkaZkClient = new KafkaZkClient(zkClient, false,
Time.SYSTEM);
final AdminZkClient adminZkClient = new AdminZkClient(kafkaZkClient);
adminZkClient.createTopic(topic, partitions, replication, topicConfig,
RackAwareMode.Enforced$.MODULE$);
@@ -192,7 +194,9 @@ public void deleteTopic(final String topic) {
DEFAULT_ZK_SESSION_TIMEOUT_MS,
DEFAULT_ZK_CONNECTION_TIMEOUT_MS,
Integer.MAX_VALUE,
- Time.SYSTEM);
+ Time.SYSTEM,
+ "testMetricGroup",
+ "testMetricType");
final KafkaZkClient kafkaZkClient = new KafkaZkClient(zkClient, false,
Time.SYSTEM);
final AdminZkClient adminZkClient = new AdminZkClient(kafkaZkClient);
adminZkClient.deleteTopic(topic);
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> move ZK metrics in KafkaHealthCheck to ZookeeperClient
> ------------------------------------------------------
>
> Key: KAFKA-6320
> URL: https://issues.apache.org/jira/browse/KAFKA-6320
> Project: Kafka
> Issue Type: Sub-task
> Affects Versions: 1.0.0
> Reporter: Jun Rao
> Assignee: Jun Rao
> Fix For: 1.1.0
>
>
> In KAFKA-5473, we will be de-commissioning the usage of KafkaHealthCheck. So,
> we need to move the ZK metrics SessionState and ZooKeeper${eventType}PerSec
> in that class to somewhere else (e.g. ZookeeperClient).
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)