KAFKA-858 High watermark values can be overwritten during controlled shutdown; reviewed by Jun Rao
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/ef123c20 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/ef123c20 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/ef123c20 Branch: refs/heads/trunk Commit: ef123c20b82dab2f9bfa125bbb81521b0ff806ae Parents: d4a70eb Author: Neha Narkhede <[email protected]> Authored: Tue Apr 9 14:11:07 2013 -0700 Committer: Neha Narkhede <[email protected]> Committed: Tue Apr 9 14:11:07 2013 -0700 ---------------------------------------------------------------------- .../scala/kafka/server/AbstractFetcherManager.scala | 4 ++-- .../src/main/scala/kafka/server/ReplicaManager.scala | 15 +++++++-------- 2 files changed, 9 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/ef123c20/core/src/main/scala/kafka/server/AbstractFetcherManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/AbstractFetcherManager.scala b/core/src/main/scala/kafka/server/AbstractFetcherManager.scala index be872dc..4269219 100644 --- a/core/src/main/scala/kafka/server/AbstractFetcherManager.scala +++ b/core/src/main/scala/kafka/server/AbstractFetcherManager.scala @@ -46,13 +46,13 @@ abstract class AbstractFetcherManager(protected val name: String, numFetchers: I fetcherThread.start } fetcherThread.addPartition(topic, partitionId, initialOffset) - info("adding fetcher on topic %s, partition %d, initOffset %d to broker %d with fetcherId %d" + info("Adding fetcher for partition [%s,%d], initOffset %d to broker %d with fetcherId %d" .format(topic, partitionId, initialOffset, sourceBroker.id, key.fetcherId)) } } def removeFetcher(topic: String, partitionId: Int) { - info("removing fetcher on topic %s, partition %d".format(topic, partitionId)) + info("Removing fetcher for partition [%s,%d]".format(topic, partitionId)) mapLock synchronized { for ((key, fetcher) <- fetcherThreadMap) { fetcher.removePartition(topic, partitionId) http://git-wip-us.apache.org/repos/asf/kafka/blob/ef123c20/core/src/main/scala/kafka/server/ReplicaManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 6d849ac..4a41bde 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -120,11 +120,11 @@ class ReplicaManager(val config: KafkaConfig, leaderPartitionsLock synchronized { leaderPartitions -= replica.partition } - allPartitions.remove((topic, partitionId)) - info("After removing partition [%s,%d], the rest of allReplicas is: [%s]".format(topic, partitionId, allPartitions)) + if(deletePartition) + allPartitions.remove((topic, partitionId)) case None => //do nothing if replica no longer exists } - stateChangeLogger.trace("Broker %d finished handling stop replica [%s,%d]".format(localBrokerId, topic, partitionId)) + stateChangeLogger.trace("Broker %d finished handling stop replica for partition [%s,%d]".format(localBrokerId, topic, partitionId)) errorCode } @@ -168,7 +168,7 @@ class ReplicaManager(val config: KafkaConfig, if(replicaOpt.isDefined) return replicaOpt.get else - throw new ReplicaNotAvailableException("Replica %d is not available for partiton [%s,%d] yet".format(config.brokerId, topic, partition)) + throw new ReplicaNotAvailableException("Replica %d is not available for partition [%s,%d]".format(config.brokerId, topic, partition)) } def getLeaderReplicaIfLocal(topic: String, partitionId: Int): Replica = { @@ -230,10 +230,9 @@ class ReplicaManager(val config: KafkaConfig, errorCode = ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]) } responseMap.put(topicAndPartition, errorCode) - leaderAndISRRequest.partitionStateInfos.foreach(p => - stateChangeLogger.trace("Broker %d handled LeaderAndIsr request correlationId %d received from controller %d epoch %d for partition [%s,%d]" - .format(localBrokerId, leaderAndISRRequest.correlationId, leaderAndISRRequest.controllerId, - leaderAndISRRequest.controllerEpoch, p._1._1, p._1._2))) + stateChangeLogger.trace("Broker %d handled LeaderAndIsr request correlationId %d received from controller %d epoch %d for partition [%s,%d]" + .format(localBrokerId, leaderAndISRRequest.correlationId, leaderAndISRRequest.controllerId, leaderAndISRRequest.controllerEpoch, + topicAndPartition._1, topicAndPartition._2)) } info("Handled leader and isr request %s".format(leaderAndISRRequest)) // we initialize highwatermark thread after the first leaderisrrequest. This ensures that all the partitions
