Repository: kafka Updated Branches: refs/heads/trunk 5c337d759 -> b1d325b3c
KAFKA-3091: Broker persists generated ID even when the ID can't be used due to duplicates â¦updated to a new valid one Author: Grant Henke <[email protected]> Reviewers: Gwen Shapira Closes #763 from granthenke/id-start-failure Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/b1d325b3 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/b1d325b3 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/b1d325b3 Branch: refs/heads/trunk Commit: b1d325b3c09cd95d69a66fac4a3760f57d3062c9 Parents: 5c337d7 Author: Grant Henke <[email protected]> Authored: Mon Jan 18 18:55:50 2016 -0800 Committer: Gwen Shapira <[email protected]> Committed: Mon Jan 18 18:55:50 2016 -0800 ---------------------------------------------------------------------- .../main/scala/kafka/server/KafkaServer.scala | 25 ++++++++----- .../server/ServerGenerateBrokerIdTest.scala | 38 ++++++++++++++++++++ 2 files changed, 55 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/b1d325b3/core/src/main/scala/kafka/server/KafkaServer.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index 454633e..901ba2e 100755 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -239,6 +239,9 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime, threadNamePr kafkaHealthcheck = new KafkaHealthcheck(config.brokerId, listeners, zkUtils) kafkaHealthcheck.startup() + // Now that the broker id is successfully registered via KafkaHealthcheck, checkpoint it + checkpointBrokerId(config.brokerId) + /* register broker metrics */ registerStats() @@ -620,16 +623,12 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime, threadNamePr */ private def getBrokerId: Int = { var brokerId = config.brokerId - var logDirsWithoutMetaProps: List[String] = List() val brokerIdSet = mutable.HashSet[Int]() for (logDir <- config.logDirs) { val brokerMetadataOpt = brokerMetadataCheckpoints(logDir).read() - brokerMetadataOpt match { - case Some(brokerMetadata: BrokerMetadata) => - brokerIdSet.add(brokerMetadata.brokerId) - case None => - logDirsWithoutMetaProps ++= List(logDir) + brokerMetadataOpt.foreach { brokerMetadata => + brokerIdSet.add(brokerMetadata.brokerId) } } @@ -642,12 +641,22 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime, threadNamePr else if(brokerIdSet.size == 1) // pick broker.id from meta.properties brokerId = brokerIdSet.last + brokerId + } + + private def checkpointBrokerId(brokerId: Int) { + var logDirsWithoutMetaProps: List[String] = List() + + for (logDir <- config.logDirs) { + val brokerMetadataOpt = brokerMetadataCheckpoints(logDir).read() + if(brokerMetadataOpt.isEmpty) + logDirsWithoutMetaProps ++= List(logDir) + } + for(logDir <- logDirsWithoutMetaProps) { val checkpoint = brokerMetadataCheckpoints(logDir) checkpoint.write(new BrokerMetadata(brokerId)) } - - brokerId } private def generateBrokerId: Int = { http://git-wip-us.apache.org/repos/asf/kafka/blob/b1d325b3/core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala b/core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala index 60ec561..c26ff13 100755 --- a/core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala +++ b/core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala @@ -137,6 +137,44 @@ class ServerGenerateBrokerIdTest extends ZooKeeperTestHarness { TestUtils.verifyNonDaemonThreadsStatus(this.getClass.getName) } + @Test + def testBrokerMetadataOnIdCollision() { + // Start a good server + val propsA = TestUtils.createBrokerConfig(1, zkConnect) + val configA = KafkaConfig.fromProps(propsA) + val serverA = new KafkaServer(configA) + serverA.startup() + + // Start a server that collides on the broker id + val propsB = TestUtils.createBrokerConfig(1, zkConnect) + val configB = KafkaConfig.fromProps(propsB) + val serverB = new KafkaServer(configB) + intercept[RuntimeException] { + serverB.startup() + } + + // verify no broker metadata was written + serverB.config.logDirs.foreach { logDir => + val brokerMetaFile = new File(logDir + File.separator + brokerMetaPropsFile) + assertFalse(brokerMetaFile.exists()) + } + + // adjust the broker config and start again + propsB.setProperty(KafkaConfig.BrokerIdProp, "2") + val newConfigB = KafkaConfig.fromProps(propsB) + val newServerB = new KafkaServer(newConfigB) + newServerB.startup() + + serverA.shutdown() + newServerB.shutdown() + // verify correct broker metadata was written + assertTrue(verifyBrokerMetadata(serverA.config.logDirs,1)) + assertTrue(verifyBrokerMetadata(newServerB.config.logDirs,2)) + CoreUtils.rm(serverA.config.logDirs) + CoreUtils.rm(newServerB.config.logDirs) + TestUtils.verifyNonDaemonThreadsStatus(this.getClass.getName) + } + def verifyBrokerMetadata(logDirs: Seq[String], brokerId: Int): Boolean = { for(logDir <- logDirs) { val brokerMetadataOpt = (new BrokerMetadataCheckpoint(
