Repository: kafka
Updated Branches:
  refs/heads/0.8.2 bafecc936 -> 5f3eb1caf


kafka-1971; starting a broker with a conflicting id will delete the previous 
broker registration; patched by Jun Rao; reviewed by Neha Narkhede


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/5f3eb1ca
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/5f3eb1ca
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/5f3eb1ca

Branch: refs/heads/0.8.2
Commit: 5f3eb1cafaecee746641fa43945cc0762635a0bd
Parents: bafecc9
Author: Jun Rao <jun...@gmail.com>
Authored: Mon Feb 23 12:13:21 2015 -0800
Committer: Jun Rao <jun...@gmail.com>
Committed: Mon Feb 23 12:13:21 2015 -0800

----------------------------------------------------------------------
 .../scala/kafka/server/KafkaHealthcheck.scala   |  7 +---
 .../main/scala/kafka/server/KafkaServer.scala   |  2 -
 core/src/main/scala/kafka/utils/ZkUtils.scala   |  6 ---
 .../unit/kafka/server/ServerStartupTest.scala   | 43 +++++++++++++-------
 4 files changed, 30 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/5f3eb1ca/core/src/main/scala/kafka/server/KafkaHealthcheck.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaHealthcheck.scala 
b/core/src/main/scala/kafka/server/KafkaHealthcheck.scala
index 4acdd70..7907987 100644
--- a/core/src/main/scala/kafka/server/KafkaHealthcheck.scala
+++ b/core/src/main/scala/kafka/server/KafkaHealthcheck.scala
@@ -39,17 +39,12 @@ class KafkaHealthcheck(private val brokerId: Int,
 
   val brokerIdPath = ZkUtils.BrokerIdsPath + "/" + brokerId
   val sessionExpireListener = new SessionExpireListener
-  
+
   def startup() {
     zkClient.subscribeStateChanges(sessionExpireListener)
     register()
   }
 
-  def shutdown() {
-    zkClient.unsubscribeStateChanges(sessionExpireListener)
-    ZkUtils.deregisterBrokerInZk(zkClient, brokerId)
-  }
-
   /**
    * Register this broker as "alive" in zookeeper
    */

http://git-wip-us.apache.org/repos/asf/kafka/blob/5f3eb1ca/core/src/main/scala/kafka/server/KafkaServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala 
b/core/src/main/scala/kafka/server/KafkaServer.scala
index 1691ad7..5cd4c84 100644
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -268,8 +268,6 @@ class KafkaServer(val config: KafkaConfig, time: Time = 
SystemTime) extends Logg
       if (canShutdown) {
         Utils.swallow(controlledShutdown())
         brokerState.newState(BrokerShuttingDown)
-        if(kafkaHealthcheck != null)
-          Utils.swallow(kafkaHealthcheck.shutdown())
         if(socketServer != null)
           Utils.swallow(socketServer.shutdown())
         if(requestHandlerPool != null)

http://git-wip-us.apache.org/repos/asf/kafka/blob/5f3eb1ca/core/src/main/scala/kafka/utils/ZkUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala 
b/core/src/main/scala/kafka/utils/ZkUtils.scala
index 56e3e88..a6f4d46 100644
--- a/core/src/main/scala/kafka/utils/ZkUtils.scala
+++ b/core/src/main/scala/kafka/utils/ZkUtils.scala
@@ -179,12 +179,6 @@ object ZkUtils extends Logging {
     info("Registered broker %d at path %s with address %s:%d.".format(id, 
brokerIdPath, host, port))
   }
 
-  def deregisterBrokerInZk(zkClient: ZkClient, id: Int) {
-    val brokerIdPath = ZkUtils.BrokerIdsPath + "/" + id
-    deletePath(zkClient, brokerIdPath)
-    info("Deregistered broker %d at path %s.".format(id, brokerIdPath))
-  }
-
   def getConsumerPartitionOwnerPath(group: String, topic: String, partition: 
Int): String = {
     val topicDirs = new ZKGroupTopicDirs(group, topic)
     topicDirs.consumerOwnerDir + "/" + partition

http://git-wip-us.apache.org/repos/asf/kafka/blob/5f3eb1ca/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala 
b/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala
index a0ed485..93af7df 100644
--- a/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala
@@ -18,7 +18,6 @@
 package kafka.server
 
 import org.scalatest.junit.JUnit3Suite
-import kafka.zk
 import kafka.utils.ZkUtils
 import kafka.utils.Utils
 import kafka.utils.TestUtils
@@ -27,28 +26,44 @@ import kafka.zk.ZooKeeperTestHarness
 import junit.framework.Assert._
 
 class ServerStartupTest extends JUnit3Suite with ZooKeeperTestHarness {
-  var server : KafkaServer = null
-  val brokerId = 0
-  val zookeeperChroot = "/kafka-chroot-for-unittest"
 
-  override def setUp() {
-    super.setUp()
+  def testBrokerCreatesZKChroot {
+    val brokerId = 0
+    val zookeeperChroot = "/kafka-chroot-for-unittest"
     val props = TestUtils.createBrokerConfig(brokerId, TestUtils.choosePort())
     val zooKeeperConnect = props.get("zookeeper.connect")
     props.put("zookeeper.connect", zooKeeperConnect + zookeeperChroot)
+    val server = TestUtils.createServer(new KafkaConfig(props))
 
-    server = TestUtils.createServer(new KafkaConfig(props))
-  }
+    val pathExists = ZkUtils.pathExists(zkClient, zookeeperChroot)
+    assertTrue(pathExists)
 
-  override def tearDown() {
     server.shutdown()
     Utils.rm(server.config.logDirs)
-    super.tearDown()
   }
 
-  def testBrokerCreatesZKChroot {
-    val pathExists = ZkUtils.pathExists(zkClient, zookeeperChroot)
-    assertTrue(pathExists)
-  }
+  def testConflictBrokerRegistration {
+    // Try starting a broker with the a conflicting broker id.
+    // This shouldn't affect the existing broker registration.
+
+    val brokerId = 0
+    val props1 = TestUtils.createBrokerConfig(brokerId)
+    val server1 = TestUtils.createServer(new KafkaConfig(props1))
+    val brokerRegistration = ZkUtils.readData(zkClient, ZkUtils.BrokerIdsPath 
+ "/" + brokerId)._1
 
+    val props2 = TestUtils.createBrokerConfig(brokerId)
+    try {
+      TestUtils.createServer(new KafkaConfig(props2))
+      fail("Registering a broker with a conflicting id should fail")
+    } catch {
+      case e : RuntimeException =>
+      // this is expected
+    }
+
+    // broker registration shouldn't change
+    assertEquals(brokerRegistration, ZkUtils.readData(zkClient, 
ZkUtils.BrokerIdsPath + "/" + brokerId)._1)
+
+    server1.shutdown()
+    Utils.rm(server1.config.logDirs)
+  }
 }
\ No newline at end of file

Reply via email to