KAFKA-769 On startup, a brokers highwatermark for every topic partition gets reset to zero; reviewed by Neha Narkhede
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/144a0a2a Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/144a0a2a Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/144a0a2a Branch: refs/heads/trunk Commit: 144a0a2ac02ecf6297f4dee4ae773be59095b1e7 Parents: 0be45b3 Author: Sriram Subramanian <sriram....@gmail.com> Authored: Fri Feb 22 15:26:03 2013 -0800 Committer: Neha Narkhede <neha.narkh...@gmail.com> Committed: Fri Feb 22 15:26:13 2013 -0800 ---------------------------------------------------------------------- .../main/scala/kafka/server/ReplicaManager.scala | 9 +++++++-- 1 files changed, 7 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/144a0a2a/core/src/main/scala/kafka/server/ReplicaManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 1044085..4e6c8ea 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -49,6 +49,7 @@ class ReplicaManager(val config: KafkaConfig, this.logIdent = "Replica Manager on Broker " + config.brokerId + ": " private val highWatermarkCheckPointThreadStarted = new AtomicBoolean(false) val highWatermarkCheckpoints = config.logDirs.map(dir => (dir, new HighwaterMarkCheckpoint(dir))).toMap + private var hwThreadInitialized = false newGauge( "LeaderCount", @@ -92,8 +93,6 @@ class ReplicaManager(val config: KafkaConfig, def startup() { // start ISR expiration thread kafkaScheduler.scheduleWithRate(maybeShrinkIsr, "isr-expiration-thread-", 0, config.replicaLagTimeMaxMs) - // start high watermark checkpoint thread - startHighWaterMarksCheckPointThread() } def stopReplica(topic: String, partitionId: Int, deletePartition: Boolean): Short = { @@ -209,6 +208,12 @@ class ReplicaManager(val config: KafkaConfig, responseMap.put(topicAndPartition, errorCode) } info("Completed leader and isr request %s".format(leaderAndISRRequest)) + // we initialize highwatermark thread after the first leaderisrrequest. This ensures that all the partitions + // have been completely populated before starting the checkpointing there by avoiding weird race conditions + if (!hwThreadInitialized) { + startHighWaterMarksCheckPointThread() + hwThreadInitialized = true + } replicaFetcherManager.shutdownIdleFetcherThreads() (responseMap, ErrorMapping.NoError) }