This is an automated email from the ASF dual-hosted git repository.
ijuma pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 591954e MINOR: Add registerController method to KafkaZkClient (#4598)
591954e is described below
commit 591954e2e55e0c262bf029f4b8f17566dcae6818
Author: Sandor Murakozi <[email protected]>
AuthorDate: Sat Jul 21 20:40:00 2018 +0200
MINOR: Add registerController method to KafkaZkClient (#4598)
And change KafkaController to use the newly introduced method.
Also remove redundant `InZk` postfixes from `registerBrokerInZk` and
`updateBrokerInfoInZk`.
As `checkedEphemeralCreate` is not used outside of `KafkaZkClient`
any longer, reduce its visibility.
ControllerIntegrationTest already covers this functionality well, it
validates the
refactor.
Reviewers: Ismael Juma <[email protected]>
---
.../scala/kafka/controller/KafkaController.scala | 6 +++---
core/src/main/scala/kafka/server/KafkaServer.scala | 2 +-
core/src/main/scala/kafka/zk/KafkaZkClient.scala | 17 ++++++++++++++---
.../scala/unit/kafka/admin/ConfigCommandTest.scala | 2 +-
core/src/test/scala/unit/kafka/utils/TestUtils.scala | 2 +-
.../test/scala/unit/kafka/zk/KafkaZkClientTest.scala | 20 ++++++++++----------
6 files changed, 30 insertions(+), 19 deletions(-)
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala
b/core/src/main/scala/kafka/controller/KafkaController.scala
index 11d22fd..645080f 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -195,7 +195,7 @@ class KafkaController(val config: KafkaConfig, zkClient:
KafkaZkClient, time: Ti
private[kafka] def updateBrokerInfo(newBrokerInfo: BrokerInfo): Unit = {
this.brokerInfo = newBrokerInfo
- zkClient.updateBrokerInfoInZk(newBrokerInfo)
+ zkClient.updateBrokerInfo(newBrokerInfo)
}
private[kafka] def enableDefaultUncleanLeaderElection(): Unit = {
@@ -1208,7 +1208,7 @@ class KafkaController(val config: KafkaConfig, zkClient:
KafkaZkClient, time: Ti
}
try {
- zkClient.checkedEphemeralCreate(ControllerZNode.path,
ControllerZNode.encode(config.brokerId, timestamp))
+ zkClient.registerController(config.brokerId, timestamp)
info(s"${config.brokerId} successfully elected as the controller")
activeControllerId = config.brokerId
onControllerFailover()
@@ -1516,7 +1516,7 @@ class KafkaController(val config: KafkaConfig, zkClient:
KafkaZkClient, time: Ti
override def state: ControllerState = ControllerState.ControllerChange
override def process(): Unit = {
- zkClient.registerBrokerInZk(brokerInfo)
+ zkClient.registerBroker(brokerInfo)
Reelect.process()
}
}
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala
b/core/src/main/scala/kafka/server/KafkaServer.scala
index f73ede6..6c1bb83 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -254,7 +254,7 @@ class KafkaServer(val config: KafkaConfig, time: Time =
Time.SYSTEM, threadNameP
replicaManager.startup()
val brokerInfo = createBrokerInfo
- zkClient.registerBrokerInZk(brokerInfo)
+ zkClient.registerBroker(brokerInfo)
// Now that the broker id is successfully registered, checkpoint it
checkpointBrokerId(config.brokerId)
diff --git a/core/src/main/scala/kafka/zk/KafkaZkClient.scala
b/core/src/main/scala/kafka/zk/KafkaZkClient.scala
index ec4932a..c45a90f 100644
--- a/core/src/main/scala/kafka/zk/KafkaZkClient.scala
+++ b/core/src/main/scala/kafka/zk/KafkaZkClient.scala
@@ -79,13 +79,24 @@ class KafkaZkClient private (zooKeeperClient:
ZooKeeperClient, isSecure: Boolean
createResponse.name
}
- def registerBrokerInZk(brokerInfo: BrokerInfo): Unit = {
+ def registerBroker(brokerInfo: BrokerInfo): Unit = {
val path = brokerInfo.path
checkedEphemeralCreate(path, brokerInfo.toJsonBytes)
info(s"Registered broker ${brokerInfo.broker.id} at path $path with
addresses: ${brokerInfo.broker.endPoints}")
}
- def updateBrokerInfoInZk(brokerInfo: BrokerInfo): Unit = {
+ /**
+ * Registers a given broker in zookeeper as the controller.
+ * @param controllerId the id of the broker that is to be registered as the
controller.
+ * @param timestamp the timestamp of the controller election.
+ * @throws KeeperException if an error is returned by ZooKeeper.
+ */
+ def registerController(controllerId: Int, timestamp: Long): Unit = {
+ val path = ControllerZNode.path
+ checkedEphemeralCreate(path, ControllerZNode.encode(controllerId,
timestamp))
+ }
+
+ def updateBrokerInfo(brokerInfo: BrokerInfo): Unit = {
val brokerIdPath = brokerInfo.path
val setDataRequest = SetDataRequest(brokerIdPath, brokerInfo.toJsonBytes,
ZkVersion.NoVersion)
val response = retryRequestUntilConnected(setDataRequest)
@@ -1509,7 +1520,7 @@ class KafkaZkClient private (zooKeeperClient:
ZooKeeperClient, isSecure: Boolean
responses
}
- def checkedEphemeralCreate(path: String, data: Array[Byte]): Unit = {
+ private def checkedEphemeralCreate(path: String, data: Array[Byte]): Unit = {
val checkedEphemeral = new CheckedEphemeral(path, data)
info(s"Creating $path (is it secure? $isSecure)")
val code = checkedEphemeral.create()
diff --git a/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala
b/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala
index 2e8179c..cb261f6 100644
--- a/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala
@@ -687,7 +687,7 @@ class ConfigCommandTest extends ZooKeeperTestHarness with
Logging {
val securityProtocol = SecurityProtocol.PLAINTEXT
val endpoint = new EndPoint("localhost", 9092,
ListenerName.forSecurityProtocol(securityProtocol), securityProtocol)
val brokerInfo = BrokerInfo(Broker(id, Seq(endpoint), rack = None),
ApiVersion.latestVersion, jmxPort = 9192)
- zkClient.registerBrokerInZk(brokerInfo)
+ zkClient.registerBroker(brokerInfo)
}
class DummyAdminZkClient(zkClient: KafkaZkClient) extends
AdminZkClient(zkClient) {
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index f89abb9..cf60c78 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -674,7 +674,7 @@ object TestUtils extends Logging {
val listenerName = ListenerName.forSecurityProtocol(protocol)
Broker(b.id, Seq(EndPoint("localhost", 6667, listenerName, protocol)),
b.rack)
}
- brokers.foreach(b => zkClient.registerBrokerInZk(BrokerInfo(Broker(b.id,
b.endPoints, rack = b.rack),
+ brokers.foreach(b => zkClient.registerBroker(BrokerInfo(Broker(b.id,
b.endPoints, rack = b.rack),
ApiVersion.latestVersion, jmxPort = -1)))
brokers
}
diff --git a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
index cc67a01..df009e8 100644
--- a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
+++ b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
@@ -630,17 +630,17 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
val brokerInfo = createBrokerInfo(1, "test.host", 9999,
SecurityProtocol.PLAINTEXT)
val differentBrokerInfoWithSameId = createBrokerInfo(1, "test.host2",
9995, SecurityProtocol.SSL)
- zkClient.registerBrokerInZk(brokerInfo)
+ zkClient.registerBroker(brokerInfo)
assertEquals(Some(brokerInfo.broker), zkClient.getBroker(1))
assertEquals("Other ZK clients can read broker info",
Some(brokerInfo.broker), otherZkClient.getBroker(1))
// Node exists, owned by current session - no error, no update
- zkClient.registerBrokerInZk(differentBrokerInfoWithSameId)
+ zkClient.registerBroker(differentBrokerInfoWithSameId)
assertEquals(Some(brokerInfo.broker), zkClient.getBroker(1))
// Other client tries to register broker with same id causes failure, info
is not changed in ZK
intercept[NodeExistsException] {
- otherZkClient.registerBrokerInZk(differentBrokerInfoWithSameId)
+ otherZkClient.registerBroker(differentBrokerInfoWithSameId)
}
assertEquals(Some(brokerInfo.broker), zkClient.getBroker(1))
}
@@ -656,8 +656,8 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
val brokerInfo0 = createBrokerInfo(0, "test.host0", 9998,
SecurityProtocol.PLAINTEXT)
val brokerInfo1 = createBrokerInfo(1, "test.host1", 9999,
SecurityProtocol.SSL)
- zkClient.registerBrokerInZk(brokerInfo1)
- otherZkClient.registerBrokerInZk(brokerInfo0)
+ zkClient.registerBroker(brokerInfo1)
+ otherZkClient.registerBroker(brokerInfo0)
assertEquals(Seq(0, 1), zkClient.getSortedBrokerList())
assertEquals(
@@ -674,17 +674,17 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
// Updating info of a broker not existing in ZK fails
val originalBrokerInfo = createBrokerInfo(1, "test.host", 9999,
SecurityProtocol.PLAINTEXT)
intercept[NoNodeException]{
- zkClient.updateBrokerInfoInZk(originalBrokerInfo)
+ zkClient.updateBrokerInfo(originalBrokerInfo)
}
- zkClient.registerBrokerInZk(originalBrokerInfo)
+ zkClient.registerBroker(originalBrokerInfo)
val updatedBrokerInfo = createBrokerInfo(1, "test.host2", 9995,
SecurityProtocol.SSL)
- zkClient.updateBrokerInfoInZk(updatedBrokerInfo)
+ zkClient.updateBrokerInfo(updatedBrokerInfo)
assertEquals(Some(updatedBrokerInfo.broker), zkClient.getBroker(1))
// Other ZK clients can update info
- otherZkClient.updateBrokerInfoInZk(originalBrokerInfo)
+ otherZkClient.updateBrokerInfo(originalBrokerInfo)
assertEquals(Some(originalBrokerInfo.broker), otherZkClient.getBroker(1))
}
@@ -937,7 +937,7 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
// No controller
assertEquals(None, zkClient.getControllerId)
// Create controller
- zkClient.checkedEphemeralCreate(ControllerZNode.path,
ControllerZNode.encode(brokerId = 1, timestamp = 123456))
+ zkClient.registerController(controllerId = 1, timestamp = 123456)
assertEquals(Some(1), zkClient.getControllerId)
zkClient.deleteController()
assertEquals(None, zkClient.getControllerId)