http://git-wip-us.apache.org/repos/asf/kafka/blob/fc93fb4b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala index ee8fa1e..bb3b14b 100755 --- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala +++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala @@ -31,7 +31,7 @@ import org.apache.kafka.common.metrics.Metrics import org.apache.kafka.common.network._ import org.apache.kafka.common.protocol.{ApiKeys, SecurityProtocol} import org.apache.kafka.common.requests.UpdateMetadataRequest.EndPoint -import org.apache.kafka.common.requests.{UpdateMetadataRequest, _} +import org.apache.kafka.common.requests._ import org.apache.kafka.common.security.JaasContext import org.apache.kafka.common.utils.Time import org.apache.kafka.common.{Node, TopicPartition, requests} @@ -281,10 +281,10 @@ class RequestSendThread(val controllerId: Int, class ControllerBrokerRequestBatch(controller: KafkaController) extends Logging { val controllerContext = controller.controllerContext val controllerId: Int = controller.config.brokerId - val leaderAndIsrRequestMap = mutable.Map.empty[Int, mutable.Map[TopicPartition, PartitionStateInfo]] + val leaderAndIsrRequestMap = mutable.Map.empty[Int, mutable.Map[TopicPartition, LeaderAndIsrPartitionState]] val stopReplicaRequestMap = mutable.Map.empty[Int, Seq[StopReplicaRequestInfo]] val updateMetadataRequestBrokerSet = mutable.Set.empty[Int] - val updateMetadataRequestPartitionInfoMap = mutable.Map.empty[TopicPartition, PartitionStateInfo] + val updateMetadataRequestPartitionInfoMap = mutable.Map.empty[TopicPartition, MetadataPartitionState] private val stateChangeLogger = KafkaController.stateChangeLogger def newBatch() { @@ -310,12 +310,13 @@ class ControllerBrokerRequestBatch(controller: KafkaController) extends Logging def addLeaderAndIsrRequestForBrokers(brokerIds: Seq[Int], topic: String, partition: Int, leaderIsrAndControllerEpoch: LeaderIsrAndControllerEpoch, - replicas: Seq[Int], callback: AbstractResponse => Unit = null) { + replicas: Seq[Int], isNew: Boolean = false) { val topicPartition = new TopicPartition(topic, partition) brokerIds.filter(_ >= 0).foreach { brokerId => val result = leaderAndIsrRequestMap.getOrElseUpdate(brokerId, mutable.Map.empty) - result.put(topicPartition, PartitionStateInfo(leaderIsrAndControllerEpoch, replicas)) + val alreadyNew = result.get(topicPartition).exists(_.isNew) + result.put(topicPartition, LeaderAndIsrPartitionState(leaderIsrAndControllerEpoch, replicas, isNew || alreadyNew)) } addUpdateMetadataRequestForBrokers(controllerContext.liveOrShuttingDownBrokerIds.toSeq, @@ -345,7 +346,7 @@ class ControllerBrokerRequestBatch(controller: KafkaController) extends Logging leaderIsrAndControllerEpochOpt match { case Some(l @ LeaderIsrAndControllerEpoch(leaderAndIsr, controllerEpoch)) => val replicas = controllerContext.partitionReplicaAssignment(partition) - + val offlineReplicas = replicas.filter(!controllerContext.isReplicaOnline(_, partition)) val leaderIsrAndControllerEpoch = if (beingDeleted) { val leaderDuringDelete = LeaderAndIsr.duringDelete(leaderAndIsr.isr) LeaderIsrAndControllerEpoch(leaderDuringDelete, controllerEpoch) @@ -353,7 +354,7 @@ class ControllerBrokerRequestBatch(controller: KafkaController) extends Logging l } - val partitionStateInfo = PartitionStateInfo(leaderIsrAndControllerEpoch, replicas) + val partitionStateInfo = MetadataPartitionState(leaderIsrAndControllerEpoch, replicas, offlineReplicas) updateMetadataRequestPartitionInfoMap.put(new TopicPartition(partition.topic, partition.partition), partitionStateInfo) case None => @@ -379,8 +380,12 @@ class ControllerBrokerRequestBatch(controller: KafkaController) extends Logging def sendRequestsToBrokers(controllerEpoch: Int) { try { - leaderAndIsrRequestMap.foreach { case (broker, partitionStateInfos) => - partitionStateInfos.foreach { case (topicPartition, state) => + val leaderAndIsrRequestVersion: Short = + if (controller.config.interBrokerProtocolVersion >= KAFKA_0_11_1_IV0) 1 + else 0 + + leaderAndIsrRequestMap.foreach { case (broker, leaderAndIsrPartitionStates) => + leaderAndIsrPartitionStates.foreach { case (topicPartition, state) => val typeOfRequest = if (broker == state.leaderIsrAndControllerEpoch.leaderAndIsr.leader) "become-leader" else "become-follower" @@ -389,20 +394,21 @@ class ControllerBrokerRequestBatch(controller: KafkaController) extends Logging state.leaderIsrAndControllerEpoch, broker, topicPartition.topic, topicPartition.partition)) } - val leaderIds = partitionStateInfos.map(_._2.leaderIsrAndControllerEpoch.leaderAndIsr.leader).toSet + val leaderIds = leaderAndIsrPartitionStates.map(_._2.leaderIsrAndControllerEpoch.leaderAndIsr.leader).toSet val leaders = controllerContext.liveOrShuttingDownBrokers.filter(b => leaderIds.contains(b.id)).map { _.getNode(controller.config.interBrokerListenerName) } - val partitionStates = partitionStateInfos.map { case (topicPartition, partitionStateInfo) => - val LeaderIsrAndControllerEpoch(leaderIsr, controllerEpoch) = partitionStateInfo.leaderIsrAndControllerEpoch + val partitionStates = leaderAndIsrPartitionStates.map { case (topicPartition, leaderAndIsrPartitionState) => + val LeaderIsrAndControllerEpoch(leaderIsr, controllerEpoch) = leaderAndIsrPartitionState.leaderIsrAndControllerEpoch val partitionState = new requests.PartitionState(controllerEpoch, leaderIsr.leader, leaderIsr.leaderEpoch, leaderIsr.isr.map(Integer.valueOf).asJava, leaderIsr.zkVersion, - partitionStateInfo.allReplicas.map(Integer.valueOf).asJava) + leaderAndIsrPartitionState.allReplicas.map(Integer.valueOf).asJava, leaderAndIsrPartitionState.isNew) topicPartition -> partitionState } - val leaderAndIsrRequest = new LeaderAndIsrRequest.Builder(controllerId, controllerEpoch, partitionStates.asJava, - leaders.asJava) - controller.sendRequest(broker, ApiKeys.LEADER_AND_ISR, leaderAndIsrRequest) + val leaderAndIsrRequestBuilder = new LeaderAndIsrRequest.Builder(leaderAndIsrRequestVersion, controllerId, + controllerEpoch, partitionStates.asJava, leaders.asJava) + controller.sendRequest(broker, ApiKeys.LEADER_AND_ISR, leaderAndIsrRequestBuilder, + (r: AbstractResponse) => controller.eventManager.put(controller.LeaderAndIsrResponseReceived(r, broker))) } leaderAndIsrRequestMap.clear() @@ -411,20 +417,21 @@ class ControllerBrokerRequestBatch(controller: KafkaController) extends Logging updateMetadataRequestBrokerSet.toString(), p._1))) val partitionStates = updateMetadataRequestPartitionInfoMap.map { case (topicPartition, partitionStateInfo) => val LeaderIsrAndControllerEpoch(leaderIsr, controllerEpoch) = partitionStateInfo.leaderIsrAndControllerEpoch - val partitionState = new requests.PartitionState(controllerEpoch, leaderIsr.leader, + val partitionState = new UpdateMetadataRequest.PartitionState(controllerEpoch, leaderIsr.leader, leaderIsr.leaderEpoch, leaderIsr.isr.map(Integer.valueOf).asJava, leaderIsr.zkVersion, - partitionStateInfo.allReplicas.map(Integer.valueOf).asJava) + partitionStateInfo.allReplicas.map(Integer.valueOf).asJava, partitionStateInfo.offlineReplicas.map(Integer.valueOf).asJava) topicPartition -> partitionState } - val version: Short = - if (controller.config.interBrokerProtocolVersion >= KAFKA_0_10_2_IV0) 3 + val updateMetadataRequestVersion: Short = + if (controller.config.interBrokerProtocolVersion >= KAFKA_0_11_1_IV0) 4 + else if (controller.config.interBrokerProtocolVersion >= KAFKA_0_10_2_IV0) 3 else if (controller.config.interBrokerProtocolVersion >= KAFKA_0_10_0_IV1) 2 else if (controller.config.interBrokerProtocolVersion >= KAFKA_0_9_0) 1 else 0 val updateMetadataRequest = { - val liveBrokers = if (version == 0) { + val liveBrokers = if (updateMetadataRequestVersion == 0) { // Version 0 of UpdateMetadataRequest only supports PLAINTEXT. controllerContext.liveOrShuttingDownBrokers.map { broker => val securityProtocol = SecurityProtocol.PLAINTEXT @@ -441,7 +448,7 @@ class ControllerBrokerRequestBatch(controller: KafkaController) extends Logging new UpdateMetadataRequest.Broker(broker.id, endPoints.asJava, broker.rack.orNull) } } - new UpdateMetadataRequest.Builder(version, controllerId, controllerEpoch, partitionStates.asJava, + new UpdateMetadataRequest.Builder(updateMetadataRequestVersion, controllerId, controllerEpoch, partitionStates.asJava, liveBrokers.asJava) }
http://git-wip-us.apache.org/repos/asf/kafka/blob/fc93fb4b/core/src/main/scala/kafka/controller/ControllerState.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/controller/ControllerState.scala b/core/src/main/scala/kafka/controller/ControllerState.scala index 2f690bb..74029b1 100644 --- a/core/src/main/scala/kafka/controller/ControllerState.scala +++ b/core/src/main/scala/kafka/controller/ControllerState.scala @@ -78,6 +78,15 @@ object ControllerState { def value = 9 } + case object LeaderAndIsrResponseReceived extends ControllerState { + def value = 10 + } + + case object LogDirChange extends ControllerState { + def value = 11 + } + val values: Seq[ControllerState] = Seq(Idle, ControllerChange, BrokerChange, TopicChange, TopicDeletion, - PartitionReassignment, AutoLeaderBalance, ManualLeaderBalance, ControlledShutdown, IsrChange) + PartitionReassignment, AutoLeaderBalance, ManualLeaderBalance, ControlledShutdown, IsrChange, LeaderAndIsrResponseReceived, + LogDirChange) } http://git-wip-us.apache.org/repos/asf/kafka/blob/fc93fb4b/core/src/main/scala/kafka/controller/KafkaController.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index ff47f14..3a61a59 100644 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -33,7 +33,7 @@ import org.I0Itec.zkclient.{IZkChildListener, IZkDataListener, IZkStateListener} import org.apache.kafka.common.errors.{BrokerNotAvailableException, ControllerMovedException} import org.apache.kafka.common.metrics.Metrics import org.apache.kafka.common.protocol.{ApiKeys, Errors} -import org.apache.kafka.common.requests.{AbstractRequest, AbstractResponse, StopReplicaResponse} +import org.apache.kafka.common.requests.{AbstractRequest, AbstractResponse, StopReplicaResponse, LeaderAndIsrResponse} import org.apache.kafka.common.utils.Time import org.apache.zookeeper.Watcher.Event.KeeperState @@ -52,6 +52,7 @@ class ControllerContext(val zkUtils: ZkUtils) { var partitionReplicaAssignment: mutable.Map[TopicAndPartition, Seq[Int]] = mutable.Map.empty var partitionLeadershipInfo: mutable.Map[TopicAndPartition, LeaderIsrAndControllerEpoch] = mutable.Map.empty val partitionsBeingReassigned: mutable.Map[TopicAndPartition, ReassignedPartitionsContext] = new mutable.HashMap + val replicasOnOfflineDirs: mutable.Map[Int, Set[TopicAndPartition]] = mutable.HashMap.empty private var liveBrokersUnderlying: Set[Broker] = Set.empty private var liveBrokerIdsUnderlying: Set[Int] = Set.empty @@ -75,6 +76,14 @@ class ControllerContext(val zkUtils: ZkUtils) { }.toSet } + def isReplicaOnline(brokerId: Int, topicAndPartition: TopicAndPartition, includeShuttingDownBrokers: Boolean = false): Boolean = { + val brokerOnline = { + if (includeShuttingDownBrokers) liveOrShuttingDownBrokerIds.contains(brokerId) + else liveBrokerIds.contains(brokerId) + } + brokerOnline && !replicasOnOfflineDirs.getOrElse(brokerId, Set.empty).contains(topicAndPartition) + } + def replicasOnBrokers(brokerIds: Set[Int]): Set[PartitionAndReplica] = { brokerIds.flatMap { brokerId => partitionReplicaAssignment.collect { @@ -98,7 +107,9 @@ class ControllerContext(val zkUtils: ZkUtils) { partitionReplicaAssignment.keySet.filter(topicAndPartition => topicAndPartition.topic == topic) def allLiveReplicas(): Set[PartitionAndReplica] = { - replicasOnBrokers(liveBrokerIds) + replicasOnBrokers(liveBrokerIds).filter { partitionAndReplica => + isReplicaOnline(partitionAndReplica.replica, TopicAndPartition(partitionAndReplica.topic, partitionAndReplica.partition)) + } } def replicasForPartition(partitions: collection.Set[TopicAndPartition]): collection.Set[PartitionAndReplica] = { @@ -175,6 +186,7 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, time: Time, met private val partitionReassignmentListener = new PartitionReassignmentListener(this, eventManager) private val preferredReplicaElectionListener = new PreferredReplicaElectionListener(this, eventManager) private val isrChangeNotificationListener = new IsrChangeNotificationListener(this, eventManager) + private val logDirEventNotificationListener = new LogDirEventNotificationListener(this, eventManager) @volatile private var activeControllerId = -1 @volatile private var offlinePartitionCount = 0 @@ -248,6 +260,7 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, time: Time, met info("Broker %d starting become controller state transition".format(config.brokerId)) readControllerEpochFromZookeeper() incrementControllerEpoch() + LogDirUtils.deleteLogDirEvents(zkUtils) // before reading source of truth from zookeeper, register the listeners to get broker/topic callbacks registerPartitionReassignmentListener() @@ -256,6 +269,7 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, time: Time, met registerTopicChangeListener() registerTopicDeletionListener() registerBrokerChangeListener() + registerLogDirEventNotificationListener() initializeControllerContext() val (topicsToBeDeleted, topicsIneligibleForDeletion) = fetchTopicDeletionsInProgress() @@ -299,6 +313,7 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, time: Time, met deregisterIsrChangeNotificationListener() deregisterPartitionReassignmentListener() deregisterPreferredReplicaElectionListener() + deregisterLogDirEventNotificationListener() // reset topic deletion manager topicDeletionManager.reset() @@ -329,6 +344,17 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, time: Time, met */ def isActive: Boolean = activeControllerId == config.brokerId + /* + * This callback is invoked by the controller's LogDirEventNotificationListener with the list of broker ids who + * have experienced new log directory failures. In response the controller should send LeaderAndIsrRequest + * to all these brokers to query the state of their replicas + */ + def onBrokerLogDirFailure(brokerIds: Seq[Int]) { + // send LeaderAndIsrRequest for all replicas on those brokers to see if they are still online. + val replicasOnBrokers = controllerContext.replicasOnBrokers(brokerIds.toSet) + replicaStateMachine.handleStateChanges(replicasOnBrokers, OnlineReplica) + } + /** * This callback is invoked by the replica state machine's broker change listener, with the list of newly started * brokers as input. It does the following - @@ -345,6 +371,7 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, time: Time, met */ def onBrokerStartup(newBrokers: Seq[Int]) { info("New broker startup callback for %s".format(newBrokers.mkString(","))) + newBrokers.foreach(controllerContext.replicasOnOfflineDirs.remove) val newBrokersSet = newBrokers.toSet // send update metadata request to all live and shutting down brokers. Old brokers will get to know of the new // broker via this update. @@ -374,46 +401,55 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, time: Time, met } } - /** + /* * This callback is invoked by the replica state machine's broker change listener with the list of failed brokers - * as input. It does the following - - * 1. Mark partitions with dead leaders as offline - * 2. Triggers the OnlinePartition state change for all new/offline partitions - * 3. Invokes the OfflineReplica state change on the input list of newly started brokers - * 4. If no partitions are effected then send UpdateMetadataRequest to live or shutting down brokers - * - * Note that we don't need to refresh the leader/isr cache for all topic/partitions at this point. This is because - * the partition state machine will refresh our cache for us when performing leader election for all new/offline - * partitions coming online. + * as input. It will call onReplicaBecomeOffline(...) with the list of replicas on those failed brokers as input. */ def onBrokerFailure(deadBrokers: Seq[Int]) { info("Broker failure callback for %s".format(deadBrokers.mkString(","))) + deadBrokers.foreach(controllerContext.replicasOnOfflineDirs.remove) val deadBrokersThatWereShuttingDown = deadBrokers.filter(id => controllerContext.shuttingDownBrokerIds.remove(id)) info("Removed %s from list of shutting down brokers.".format(deadBrokersThatWereShuttingDown)) - val deadBrokersSet = deadBrokers.toSet - // trigger OfflinePartition state for all partitions whose current leader is one amongst the dead brokers + val allReplicasOnDeadBrokers = controllerContext.replicasOnBrokers(deadBrokers.toSet) + onReplicasBecomeOffline(allReplicasOnDeadBrokers) + } + + /** + * This method marks the given replicas as offline. It does the following - + * 1. Mark the given partitions as offline + * 2. Triggers the OnlinePartition state change for all new/offline partitions + * 3. Invokes the OfflineReplica state change on the input list of newly offline replicas + * 4. If no partitions are affected then send UpdateMetadataRequest to live or shutting down brokers + * + * Note that we don't need to refresh the leader/isr cache for all topic/partitions at this point. This is because + * the partition state machine will refresh our cache for us when performing leader election for all new/offline + * partitions coming online. + */ + def onReplicasBecomeOffline(newOfflineReplicas: Set[PartitionAndReplica]): Unit = { + val (newOfflineReplicasForDeletion, newOfflineReplicasNotForDeletion) = + newOfflineReplicas.partition(p => topicDeletionManager.isTopicQueuedUpForDeletion(p.topic)) + val partitionsWithoutLeader = controllerContext.partitionLeadershipInfo.filter(partitionAndLeader => - deadBrokersSet.contains(partitionAndLeader._2.leaderAndIsr.leader) && + !controllerContext.isReplicaOnline(partitionAndLeader._2.leaderAndIsr.leader, partitionAndLeader._1) && !topicDeletionManager.isTopicQueuedUpForDeletion(partitionAndLeader._1.topic)).keySet + + // trigger OfflinePartition state for all partitions whose current leader is one amongst the newOfflineReplicas partitionStateMachine.handleStateChanges(partitionsWithoutLeader, OfflinePartition) // trigger OnlinePartition state changes for offline or new partitions partitionStateMachine.triggerOnlinePartitionStateChange() - // filter out the replicas that belong to topics that are being deleted - var allReplicasOnDeadBrokers = controllerContext.replicasOnBrokers(deadBrokersSet) - val activeReplicasOnDeadBrokers = allReplicasOnDeadBrokers.filterNot(p => topicDeletionManager.isTopicQueuedUpForDeletion(p.topic)) - // handle dead replicas - replicaStateMachine.handleStateChanges(activeReplicasOnDeadBrokers, OfflineReplica) - // check if topic deletion state for the dead replicas needs to be updated - val replicasForTopicsToBeDeleted = allReplicasOnDeadBrokers.filter(p => topicDeletionManager.isTopicQueuedUpForDeletion(p.topic)) - if(replicasForTopicsToBeDeleted.nonEmpty) { + // trigger OfflineReplica state change for those newly offline replicas + replicaStateMachine.handleStateChanges(newOfflineReplicasNotForDeletion, OfflineReplica) + + // fail deletion of topics that affected by the offline replicas + if (newOfflineReplicasForDeletion.nonEmpty) { // it is required to mark the respective replicas in TopicDeletionFailed state since the replica cannot be - // deleted when the broker is down. This will prevent the replica from being in TopicDeletionStarted state indefinitely + // deleted when its log directory is offline. This will prevent the replica from being in TopicDeletionStarted state indefinitely // since topic deletion cannot be retried until at least one replica is in TopicDeletionStarted state - topicDeletionManager.failReplicaDeletion(replicasForTopicsToBeDeleted) + topicDeletionManager.failReplicaDeletion(newOfflineReplicasForDeletion) } - // If broker failure did not require leader re-election, inform brokers of failed broker + // If replica failure did not require leader re-election, inform brokers of the offline replica // Note that during leader re-election, brokers update their metadata if (partitionsWithoutLeader.isEmpty) { sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq) @@ -724,10 +760,11 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, time: Time, met private def fetchTopicDeletionsInProgress(): (Set[String], Set[String]) = { val topicsToBeDeleted = zkUtils.getChildrenParentMayNotExist(ZkUtils.DeleteTopicsPath).toSet - val topicsWithReplicasOnDeadBrokers = controllerContext.partitionReplicaAssignment.filter { case (_, replicas) => - replicas.exists(r => !controllerContext.liveBrokerIds.contains(r)) }.keySet.map(_.topic) + val topicsWithOfflineReplicas = controllerContext.partitionReplicaAssignment.filter { case (partition, replicas) => + replicas.exists(r => !controllerContext.isReplicaOnline(r, partition)) + }.keySet.map(_.topic) val topicsForWhichPartitionReassignmentIsInProgress = controllerContext.partitionsBeingReassigned.keySet.map(_.topic) - val topicsIneligibleForDeletion = topicsWithReplicasOnDeadBrokers | topicsForWhichPartitionReassignmentIsInProgress + val topicsIneligibleForDeletion = topicsWithOfflineReplicas | topicsForWhichPartitionReassignmentIsInProgress info("List of topics to be deleted: %s".format(topicsToBeDeleted.mkString(","))) info("List of topics ineligible for deletion: %s".format(topicsIneligibleForDeletion.mkString(","))) (topicsToBeDeleted, topicsIneligibleForDeletion) @@ -771,7 +808,7 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, time: Time, met partitionStateMachine.handleStateChanges(Set(topicAndPartition), OnlinePartition, reassignedPartitionLeaderSelector) } else { // check if the leader is alive or not - if (controllerContext.liveBrokerIds.contains(currentLeader)) { + if (controllerContext.isReplicaOnline(currentLeader, topicAndPartition)) { info("Leader %s for partition %s being reassigned, ".format(currentLeader, topicAndPartition) + "is already in the new list of replicas %s and is alive".format(reassignedReplicas.mkString(","))) // shrink replication factor and update the leader epoch in zookeeper to use on the next LeaderAndIsrRequest @@ -909,6 +946,16 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, time: Time, met } } + private def registerLogDirEventNotificationListener() = { + debug("Registering logDirEventNotificationListener") + zkUtils.zkClient.subscribeChildChanges(ZkUtils.LogDirEventNotificationPath, logDirEventNotificationListener) + } + + private def deregisterLogDirEventNotificationListener() = { + debug("De-registering logDirEventNotificationListener") + zkUtils.zkClient.unsubscribeChildChanges(ZkUtils.LogDirEventNotificationPath, logDirEventNotificationListener) + } + private def readControllerEpochFromZookeeper() { // initialize the controller epoch and zk version by reading from zookeeper if(controllerContext.zkUtils.pathExists(ZkUtils.ControllerEpochPath)) { @@ -1119,7 +1166,7 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, time: Time, met topicsNotInPreferredReplica.keys.foreach { topicPartition => // do this check only if the broker is live and there are no partitions being reassigned currently // and preferred replica election is not in progress - if (controllerContext.liveBrokerIds.contains(leaderBroker) && + if (controllerContext.isReplicaOnline(leaderBroker, topicPartition) && controllerContext.partitionsBeingReassigned.isEmpty && !topicDeletionManager.isTopicQueuedUpForDeletion(topicPartition.topic) && controllerContext.allTopics.contains(topicPartition.topic)) { @@ -1352,6 +1399,22 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, time: Time, met } + case class LogDirEventNotification(sequenceNumbers: Seq[String]) extends ControllerEvent { + + def state = ControllerState.LogDirChange + + override def process(): Unit = { + val zkUtils = controllerContext.zkUtils + try { + val brokerIds = sequenceNumbers.flatMap(LogDirUtils.getBrokerIdFromLogDirEvent(zkUtils, _)) + onBrokerLogDirFailure(brokerIds) + } finally { + // delete processed children + sequenceNumbers.map(x => zkUtils.deletePath(ZkUtils.LogDirEventNotificationPath + "/" + x)) + } + } + } + case class PreferredReplicaLeaderElection(partitions: Set[TopicAndPartition]) extends ControllerEvent { def state = ControllerState.ManualLeaderBalance @@ -1447,7 +1510,35 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, time: Time, met } } - case class TopicDeletionStopReplicaResult(stopReplicaResponseObj: AbstractResponse, replicaId: Int) extends ControllerEvent { + case class LeaderAndIsrResponseReceived(LeaderAndIsrResponseObj: AbstractResponse, brokerId: Int) extends ControllerEvent { + + def state = ControllerState.LeaderAndIsrResponseReceived + + override def process(): Unit = { + import JavaConverters._ + val leaderAndIsrResponse = LeaderAndIsrResponseObj.asInstanceOf[LeaderAndIsrResponse] + + if (leaderAndIsrResponse.error() != Errors.NONE) { + stateChangeLogger.error(s"Received error in leaderAndIsrResponse $leaderAndIsrResponse from broker $brokerId") + return + } + + val offlineReplicas = leaderAndIsrResponse.responses().asScala.filter(_._2 == Errors.KAFKA_STORAGE_ERROR).keys.map( + tp => TopicAndPartition(tp.topic(), tp.partition())).toSet + val onlineReplicas = leaderAndIsrResponse.responses().asScala.filter(_._2 == Errors.NONE).keys.map( + tp => TopicAndPartition(tp.topic(), tp.partition())).toSet + val previousOfflineReplicas = controllerContext.replicasOnOfflineDirs.getOrElse(brokerId, Set.empty[TopicAndPartition]) + val currentOfflineReplicas = previousOfflineReplicas -- onlineReplicas ++ offlineReplicas + controllerContext.replicasOnOfflineDirs.put(brokerId, currentOfflineReplicas) + val newOfflineReplicas = (currentOfflineReplicas -- previousOfflineReplicas).map(tp => PartitionAndReplica(tp.topic, tp.partition, brokerId)) + stateChangeLogger.info(s"Mark replicas ${currentOfflineReplicas -- previousOfflineReplicas} on broker $brokerId as offline") + + if (newOfflineReplicas.nonEmpty) + onReplicasBecomeOffline(newOfflineReplicas) + } + } + + case class TopicDeletionStopReplicaResponseReceived(stopReplicaResponseObj: AbstractResponse, replicaId: Int) extends ControllerEvent { def state = ControllerState.TopicDeletion @@ -1607,6 +1698,20 @@ class TopicChangeListener(controller: KafkaController, eventManager: ControllerE } } +/** + * Called when broker notifies controller of log directory change + */ +class LogDirEventNotificationListener(controller: KafkaController, eventManager: ControllerEventManager) extends IZkChildListener with Logging { + override def handleChildChange(parentPath: String, currentChilds: java.util.List[String]): Unit = { + import JavaConverters._ + eventManager.put(controller.LogDirEventNotification(currentChilds.asScala)) + } +} + +object LogDirEventNotificationListener { + val version: Long = 1L +} + class PartitionModificationsListener(controller: KafkaController, eventManager: ControllerEventManager, topic: String) extends IZkDataListener with Logging { override def handleDataChange(dataPath: String, data: Any): Unit = { eventManager.put(controller.PartitionModifications(topic)) http://git-wip-us.apache.org/repos/asf/kafka/blob/fc93fb4b/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala b/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala index 54bbb89..e534ff3 100644 --- a/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala +++ b/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala @@ -54,8 +54,8 @@ class OfflinePartitionLeaderSelector(controllerContext: ControllerContext, confi def selectLeader(topicAndPartition: TopicAndPartition, currentLeaderAndIsr: LeaderAndIsr): (LeaderAndIsr, Seq[Int]) = { controllerContext.partitionReplicaAssignment.get(topicAndPartition) match { case Some(assignedReplicas) => - val liveAssignedReplicas = assignedReplicas.filter(r => controllerContext.liveBrokerIds.contains(r)) - val liveBrokersInIsr = currentLeaderAndIsr.isr.filter(r => controllerContext.liveBrokerIds.contains(r)) + val liveAssignedReplicas = assignedReplicas.filter(r => controllerContext.isReplicaOnline(r, topicAndPartition)) + val liveBrokersInIsr = currentLeaderAndIsr.isr.filter(r => controllerContext.isReplicaOnline(r, topicAndPartition)) val newLeaderAndIsr = if (liveBrokersInIsr.isEmpty) { // Prior to electing an unclean (i.e. non-ISR) leader, ensure that doing so is not disallowed by the configuration @@ -63,7 +63,7 @@ class OfflinePartitionLeaderSelector(controllerContext: ControllerContext, confi if (!LogConfig.fromProps(config.originals, AdminUtils.fetchEntityConfig(controllerContext.zkUtils, ConfigType.Topic, topicAndPartition.topic)).uncleanLeaderElectionEnable) { throw new NoReplicaOnlineException( - s"No broker in ISR for partition $topicAndPartition is alive. Live brokers are: [${controllerContext.liveBrokerIds}], " + + s"No replica in ISR for partition $topicAndPartition is alive. Live brokers are: [${controllerContext.liveBrokerIds}], " + s"ISR brokers are: [${currentLeaderAndIsr.isr.mkString(",")}]" ) } @@ -111,7 +111,7 @@ class ReassignedPartitionLeaderSelector(controllerContext: ControllerContext) ex currentLeaderAndIsr: LeaderAndIsr): (LeaderAndIsr, Seq[Int]) = { val reassignedInSyncReplicas = controllerContext.partitionsBeingReassigned(topicAndPartition).newReplicas val newLeaderOpt = reassignedInSyncReplicas.find { r => - controllerContext.liveBrokerIds.contains(r) && currentLeaderAndIsr.isr.contains(r) + controllerContext.isReplicaOnline(r, topicAndPartition) && currentLeaderAndIsr.isr.contains(r) } newLeaderOpt match { case Some(newLeader) => (currentLeaderAndIsr.newLeader(newLeader), reassignedInSyncReplicas) @@ -150,7 +150,7 @@ class PreferredReplicaPartitionLeaderSelector(controllerContext: ControllerConte info("Current leader %d for partition %s is not the preferred replica.".format(currentLeader, topicAndPartition) + " Triggering preferred replica leader election") // check if preferred replica is not the current leader and is alive and in the isr - if (controllerContext.liveBrokerIds.contains(preferredReplica) && currentLeaderAndIsr.isr.contains(preferredReplica)) { + if (controllerContext.isReplicaOnline(preferredReplica, topicAndPartition) && currentLeaderAndIsr.isr.contains(preferredReplica)) { val newLeaderAndIsr = currentLeaderAndIsr.newLeader(preferredReplica) (newLeaderAndIsr, assignedReplicas) } else { @@ -174,8 +174,7 @@ class ControlledShutdownLeaderSelector(controllerContext: ControllerContext) ext currentLeaderAndIsr: LeaderAndIsr): (LeaderAndIsr, Seq[Int]) = { val currentIsr = currentLeaderAndIsr.isr val assignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition) - val liveOrShuttingDownBrokerIds = controllerContext.liveOrShuttingDownBrokerIds - val liveAssignedReplicas = assignedReplicas.filter(r => liveOrShuttingDownBrokerIds.contains(r)) + val liveAssignedReplicas = assignedReplicas.filter(r => controllerContext.isReplicaOnline(r, topicAndPartition, true)) val newIsr = currentIsr.filter(brokerId => !controllerContext.shuttingDownBrokerIds.contains(brokerId)) liveAssignedReplicas.find(newIsr.contains) match { http://git-wip-us.apache.org/repos/asf/kafka/blob/fc93fb4b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala index 5751e17..5024c02 100755 --- a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala +++ b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala @@ -200,7 +200,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging { controllerContext.partitionLeadershipInfo.get(topicPartition) match { case Some(currentLeaderIsrAndEpoch) => // else, check if the leader for partition is alive. If yes, it is in Online state, else it is in Offline state - if (controllerContext.liveBrokerIds.contains(currentLeaderIsrAndEpoch.leaderAndIsr.leader)) + if (controllerContext.isReplicaOnline(currentLeaderIsrAndEpoch.leaderAndIsr.leader, topicPartition)) // leader is alive partitionState.put(topicPartition, OnlinePartition) else @@ -227,7 +227,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging { */ private def initializeLeaderAndIsrForPartition(topicAndPartition: TopicAndPartition) = { val replicaAssignment = controllerContext.partitionReplicaAssignment(topicAndPartition).toList - val liveAssignedReplicas = replicaAssignment.filter(controllerContext.liveBrokerIds.contains) + val liveAssignedReplicas = replicaAssignment.filter(r => controllerContext.isReplicaOnline(r, topicAndPartition)) liveAssignedReplicas.headOption match { case None => val failMsg = s"Controller $controllerId epoch ${controller.epoch} encountered error during state change of " + @@ -259,7 +259,8 @@ class PartitionStateMachine(controller: KafkaController) extends Logging { topicAndPartition.topic, topicAndPartition.partition, leaderIsrAndControllerEpoch, - replicaAssignment + replicaAssignment, + isNew = true ) } catch { case _: ZkNodeExistsException => http://git-wip-us.apache.org/repos/asf/kafka/blob/fc93fb4b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala index 43fac19..87c53b5 100755 --- a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala +++ b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala @@ -149,7 +149,7 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging { .format(replicaId, topicAndPartition) + "state as it is being requested to become leader") brokerRequestBatch.addLeaderAndIsrRequestForBrokers(List(replicaId), topic, partition, leaderIsrAndControllerEpoch, - replicaAssignment) + replicaAssignment, isNew = true) case None => // new leader request will be sent to this replica when one gets elected } replicaState.put(partitionAndReplica, NewReplica) @@ -283,7 +283,7 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging { val partition = topicPartition.partition assignedReplicas.foreach { replicaId => val partitionAndReplica = PartitionAndReplica(topic, partition, replicaId) - if (controllerContext.liveBrokerIds.contains(replicaId)) + if (controllerContext.isReplicaOnline(replicaId, topicPartition)) replicaState.put(partitionAndReplica, OnlineReplica) else // mark replicas on dead brokers as failed for topic deletion, if they belong to a topic to be deleted. @@ -297,6 +297,7 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging { def partitionsAssignedToBroker(topics: Seq[String], brokerId: Int):Seq[TopicAndPartition] = { controllerContext.partitionReplicaAssignment.filter(_._2.contains(brokerId)).keySet.toSeq } + } sealed trait ReplicaState { http://git-wip-us.apache.org/repos/asf/kafka/blob/fc93fb4b/core/src/main/scala/kafka/controller/TopicDeletionManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/controller/TopicDeletionManager.scala b/core/src/main/scala/kafka/controller/TopicDeletionManager.scala index e483ac2..325488e 100755 --- a/core/src/main/scala/kafka/controller/TopicDeletionManager.scala +++ b/core/src/main/scala/kafka/controller/TopicDeletionManager.scala @@ -299,7 +299,7 @@ class TopicDeletionManager(controller: KafkaController, eventManager: Controller debug("Deletion started for replicas %s".format(replicasForDeletionRetry.mkString(","))) controller.replicaStateMachine.handleStateChanges(replicasForDeletionRetry, ReplicaDeletionStarted, new Callbacks.CallbackBuilder().stopReplicaCallback((stopReplicaResponseObj, replicaId) => - eventManager.put(controller.TopicDeletionStopReplicaResult(stopReplicaResponseObj, replicaId))).build) + eventManager.put(controller.TopicDeletionStopReplicaResponseReceived(stopReplicaResponseObj, replicaId))).build) if (deadReplicasForTopic.nonEmpty) { debug("Dead Replicas (%s) found for topic %s".format(deadReplicasForTopic.mkString(","), topic)) markTopicIneligibleForDeletion(Set(topic)) http://git-wip-us.apache.org/repos/asf/kafka/blob/fc93fb4b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala index 9322ff2..db2bd67 100644 --- a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala +++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala @@ -199,7 +199,8 @@ class GroupMetadataManager(brokerId: Int, | Errors.NOT_ENOUGH_REPLICAS_AFTER_APPEND => Errors.COORDINATOR_NOT_AVAILABLE - case Errors.NOT_LEADER_FOR_PARTITION => + case Errors.NOT_LEADER_FOR_PARTITION + | Errors.KAFKA_STORAGE_ERROR => Errors.NOT_COORDINATOR case Errors.REQUEST_TIMED_OUT => @@ -212,7 +213,7 @@ class GroupMetadataManager(brokerId: Int, error(s"Appending metadata message for group ${group.groupId} generation $generationId failed due to " + s"${status.error.exceptionName}, returning UNKNOWN error code to the client") - Errors.UNKNOWN + Errors.UNKNOWN_SERVER_ERROR case other => error(s"Appending metadata message for group ${group.groupId} generation $generationId failed " + @@ -342,7 +343,8 @@ class GroupMetadataManager(brokerId: Int, | Errors.NOT_ENOUGH_REPLICAS_AFTER_APPEND => Errors.COORDINATOR_NOT_AVAILABLE - case Errors.NOT_LEADER_FOR_PARTITION => + case Errors.NOT_LEADER_FOR_PARTITION + | Errors.KAFKA_STORAGE_ERROR => Errors.NOT_COORDINATOR case Errors.MESSAGE_TOO_LARGE @@ -695,7 +697,7 @@ class GroupMetadataManager(brokerId: Int, val timestampType = TimestampType.CREATE_TIME val timestamp = time.milliseconds() - val partitionOpt = replicaManager.getPartition(appendPartition) + val partitionOpt = replicaManager.getPartition(appendPartition).filter(_ ne ReplicaManager.OfflinePartition) partitionOpt.foreach { partition => val tombstones = ListBuffer.empty[SimpleRecord] removedOffsets.foreach { case (topicPartition, offsetAndMetadata) => http://git-wip-us.apache.org/repos/asf/kafka/blob/fc93fb4b/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala index 85c19c5..e201e91 100644 --- a/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala +++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala @@ -266,7 +266,7 @@ class TransactionCoordinator(brokerId: Int, } def handleTxnImmigration(txnTopicPartitionId: Int, coordinatorEpoch: Int) { - txnManager.loadTransactionsForTxnTopicPartition(txnTopicPartitionId, coordinatorEpoch, txnMarkerChannelManager.addTxnMarkersToSend) + txnManager.loadTransactionsForTxnTopicPartition(txnTopicPartitionId, coordinatorEpoch, txnMarkerChannelManager.addTxnMarkersToSend) } def handleTxnEmigration(txnTopicPartitionId: Int, coordinatorEpoch: Int) { http://git-wip-us.apache.org/repos/asf/kafka/blob/fc93fb4b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala index e0d5076..394817c 100644 --- a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala +++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala @@ -485,12 +485,13 @@ class TransactionStateManager(brokerId: Int, | Errors.REQUEST_TIMED_OUT => // note that for timed out request we return NOT_AVAILABLE error code to let client retry Errors.COORDINATOR_NOT_AVAILABLE - case Errors.NOT_LEADER_FOR_PARTITION => + case Errors.NOT_LEADER_FOR_PARTITION + | Errors.KAFKA_STORAGE_ERROR => Errors.NOT_COORDINATOR case Errors.MESSAGE_TOO_LARGE | Errors.RECORD_LIST_TOO_LARGE => - Errors.UNKNOWN + Errors.UNKNOWN_SERVER_ERROR case other => other http://git-wip-us.apache.org/repos/asf/kafka/blob/fc93fb4b/core/src/main/scala/kafka/log/AbstractIndex.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/log/AbstractIndex.scala b/core/src/main/scala/kafka/log/AbstractIndex.scala index bfc6828..d569ad9 100644 --- a/core/src/main/scala/kafka/log/AbstractIndex.scala +++ b/core/src/main/scala/kafka/log/AbstractIndex.scala @@ -178,6 +178,12 @@ abstract class AbstractIndex[K, V](@volatile var file: File, val baseOffset: Lon trimToValidSize() } + def closeHandler() = { + // File handler of the index field will be closed after the mmap is garbage collected + CoreUtils.swallow(forceUnmap(mmap)) + mmap = null + } + /** * Do a basic sanity check on this index to detect obvious problems * http://git-wip-us.apache.org/repos/asf/kafka/blob/fc93fb4b/core/src/main/scala/kafka/log/Log.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index 824d302..0fec75f 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -24,11 +24,11 @@ import java.util.concurrent.atomic._ import java.util.concurrent.{ConcurrentNavigableMap, ConcurrentSkipListMap, TimeUnit} import kafka.api.KAFKA_0_10_0_IV0 -import kafka.common._ +import kafka.common.{InvalidOffsetException, KafkaException, LongRef} import kafka.metrics.KafkaMetricsGroup -import kafka.server.{BrokerTopicStats, FetchDataInfo, LogOffsetMetadata} +import kafka.server.{BrokerTopicStats, FetchDataInfo, LogDirFailureChannel, LogOffsetMetadata} import kafka.utils._ -import org.apache.kafka.common.errors.{CorruptRecordException, OffsetOutOfRangeException, RecordBatchTooLargeException, RecordTooLargeException, UnsupportedForMessageFormatException} +import org.apache.kafka.common.errors.{KafkaStorageException, CorruptRecordException, OffsetOutOfRangeException, RecordBatchTooLargeException, RecordTooLargeException, UnsupportedForMessageFormatException} import org.apache.kafka.common.record._ import org.apache.kafka.common.requests.{IsolationLevel, ListOffsetRequest} @@ -136,7 +136,8 @@ class Log(@volatile var dir: File, val maxProducerIdExpirationMs: Int, val producerIdExpirationCheckIntervalMs: Int, val topicPartition: TopicPartition, - val producerStateManager: ProducerStateManager) extends Logging with KafkaMetricsGroup { + val producerStateManager: ProducerStateManager, + logDirFailureChannel: LogDirFailureChannel) extends Logging with KafkaMetricsGroup { import kafka.log.Log._ @@ -164,7 +165,7 @@ class Log(@volatile var dir: File, * temporary abuse seems justifiable and saves us from scanning the log after deletion to find the first offsets * of each ongoing transaction in order to compute a new first unstable offset. It is possible, however, * that this could result in disagreement between replicas depending on when they began replicating the log. - * In the worst case, the LSO could be seen by a consumer to go backwards. + * In the worst case, the LSO could be seen by a consumer to go backwards. */ @volatile var firstUnstableOffset: Option[LogOffsetMetadata] = None @@ -233,7 +234,7 @@ class Log(@volatile var dir: File, // create the log directory if it doesn't exist Files.createDirectories(dir.toPath) new LeaderEpochFileCache(topicPartition, () => logEndOffsetMetadata, - new LeaderEpochCheckpointFile(LeaderEpochFile.newFile(dir))) + new LeaderEpochCheckpointFile(LeaderEpochFile.newFile(dir), logDirFailureChannel)) } private def removeTempFilesAndCollectSwapFiles(): Set[File] = { @@ -266,6 +267,7 @@ class Log(@volatile var dir: File, swapFiles } + // This method does not need to convert IOException to KafkaStorageException because it is only called before all logs are loaded private def loadSegmentFiles(): Unit = { // load segments in ascending order because transactional data from one segment may depend on the // segments that come before it @@ -340,6 +342,7 @@ class Log(@volatile var dir: File, bytesTruncated } + // This method does not need to convert IOException to KafkaStorageException because it is only called before all logs are loaded private def completeSwapOperations(swapFiles: Set[File]): Unit = { for (swapFile <- swapFiles) { val logFile = new File(CoreUtils.replaceSuffix(swapFile.getPath, SwapFileSuffix, "")) @@ -366,7 +369,8 @@ class Log(@volatile var dir: File, } } - /* Load the log segments from the log files on disk */ + // Load the log segments from the log files on disk + // This method does not need to convert IOException to KafkaStorageException because it is only called before all logs are loaded private def loadSegments() { // first do a pass through the files in the log directory and remove any temporary files // and find any interrupted swap operations @@ -383,14 +387,14 @@ class Log(@volatile var dir: File, if(logSegments.isEmpty) { // no existing segments, create a new mutable segment beginning at offset 0 segments.put(0L, new LogSegment(dir = dir, - startOffset = 0, - indexIntervalBytes = config.indexInterval, - maxIndexSize = config.maxIndexSize, - rollJitterMs = config.randomSegmentJitter, - time = time, - fileAlreadyExists = false, - initFileSize = this.initFileSize(), - preallocate = config.preallocate)) + startOffset = 0, + indexIntervalBytes = config.indexInterval, + maxIndexSize = config.maxIndexSize, + rollJitterMs = config.randomSegmentJitter, + time = time, + fileAlreadyExists = false, + initFileSize = this.initFileSize(), + preallocate = config.preallocate)) } else if (!dir.getAbsolutePath.endsWith(Log.DeleteDirSuffix)) { recoverLog() // reset the index size of the currently active log segment to allow more entries @@ -403,6 +407,7 @@ class Log(@volatile var dir: File, nextOffsetMetadata = new LogOffsetMetadata(messageOffset, activeSegment.baseOffset, activeSegment.size) } + // This method does not need to convert IOException to KafkaStorageException because it is only called before all logs are loaded private def recoverLog() { // if we have the clean shutdown marker, skip recovery if(hasCleanShutdownFile) { @@ -532,6 +537,16 @@ class Log(@volatile var dir: File, } /** + * Close file handlers used by log but don't write to disk. This is used when the disk may have failed + */ + def closeHandlers() { + debug(s"Closing handlers of log $name") + lock synchronized { + logSegments.foreach(_.closeHandlers()) + } + } + + /** * Append this message set to the active segment of the log, assigning offsets and Partition Leader Epochs * @param records The records to append * @param isFromClient Whether or not this append is from a producer @@ -566,16 +581,16 @@ class Log(@volatile var dir: File, * @return Information about the appended messages including the first and last offset. */ private def append(records: MemoryRecords, isFromClient: Boolean, assignOffsets: Boolean, leaderEpoch: Int): LogAppendInfo = { - val appendInfo = analyzeAndValidateRecords(records, isFromClient = isFromClient) + maybeHandleIOException(s"Error while appending records to $topicPartition in dir ${dir.getParent}") { + val appendInfo = analyzeAndValidateRecords(records, isFromClient = isFromClient) - // return if we have no valid messages or if this is a duplicate of the last appended entry - if (appendInfo.shallowCount == 0) - return appendInfo + // return if we have no valid messages or if this is a duplicate of the last appended entry + if (appendInfo.shallowCount == 0) + return appendInfo - // trim any invalid bytes or partial messages before appending it to the on-disk log - var validRecords = trimInvalidBytes(records, appendInfo) + // trim any invalid bytes or partial messages before appending it to the on-disk log + var validRecords = trimInvalidBytes(records, appendInfo) - try { // they are valid, insert them in the log lock synchronized { @@ -695,8 +710,6 @@ class Log(@volatile var dir: File, appendInfo } - } catch { - case e: IOException => throw new KafkaStorageException("I/O exception in append to log '%s'".format(name), e) } } @@ -730,12 +743,14 @@ class Log(@volatile var dir: File, // We don't have to write the log start offset to log-start-offset-checkpoint immediately. // The deleteRecordsOffset may be lost only if all in-sync replicas of this broker are shutdown // in an unclean manner within log.flush.start.offset.checkpoint.interval.ms. The chance of this happening is low. - lock synchronized { - if (offset > logStartOffset) { - logStartOffset = offset - leaderEpochCache.clearAndFlushEarliest(logStartOffset) - producerStateManager.truncateHead(logStartOffset) - updateFirstUnstableOffset() + maybeHandleIOException(s"Exception while increasing log start offset for $topicPartition to $offset in dir ${dir.getParent}") { + lock synchronized { + if (offset > logStartOffset) { + logStartOffset = offset + leaderEpochCache.clearAndFlushEarliest(logStartOffset) + producerStateManager.truncateHead(logStartOffset) + updateFirstUnstableOffset() + } } } } @@ -892,64 +907,66 @@ class Log(@volatile var dir: File, */ def read(startOffset: Long, maxLength: Int, maxOffset: Option[Long] = None, minOneMessage: Boolean = false, isolationLevel: IsolationLevel): FetchDataInfo = { - trace("Reading %d bytes from offset %d in log %s of length %d bytes".format(maxLength, startOffset, name, size)) - - // Because we don't use lock for reading, the synchronization is a little bit tricky. - // We create the local variables to avoid race conditions with updates to the log. - val currentNextOffsetMetadata = nextOffsetMetadata - val next = currentNextOffsetMetadata.messageOffset - if (startOffset == next) { - val abortedTransactions = - if (isolationLevel == IsolationLevel.READ_COMMITTED) Some(List.empty[AbortedTransaction]) - else None - return FetchDataInfo(currentNextOffsetMetadata, MemoryRecords.EMPTY, firstEntryIncomplete = false, - abortedTransactions = abortedTransactions) - } + maybeHandleIOException(s"Exception while reading from $topicPartition in dir ${dir.getParent}") { + trace("Reading %d bytes from offset %d in log %s of length %d bytes".format(maxLength, startOffset, name, size)) + + // Because we don't use lock for reading, the synchronization is a little bit tricky. + // We create the local variables to avoid race conditions with updates to the log. + val currentNextOffsetMetadata = nextOffsetMetadata + val next = currentNextOffsetMetadata.messageOffset + if (startOffset == next) { + val abortedTransactions = + if (isolationLevel == IsolationLevel.READ_COMMITTED) Some(List.empty[AbortedTransaction]) + else None + return FetchDataInfo(currentNextOffsetMetadata, MemoryRecords.EMPTY, firstEntryIncomplete = false, + abortedTransactions = abortedTransactions) + } - var segmentEntry = segments.floorEntry(startOffset) - - // return error on attempt to read beyond the log end offset or read below log start offset - if (startOffset > next || segmentEntry == null || startOffset < logStartOffset) - throw new OffsetOutOfRangeException("Request for offset %d but we only have log segments in the range %d to %d.".format(startOffset, logStartOffset, next)) - - // Do the read on the segment with a base offset less than the target offset - // but if that segment doesn't contain any messages with an offset greater than that - // continue to read from successive segments until we get some messages or we reach the end of the log - while(segmentEntry != null) { - val segment = segmentEntry.getValue - - // If the fetch occurs on the active segment, there might be a race condition where two fetch requests occur after - // the message is appended but before the nextOffsetMetadata is updated. In that case the second fetch may - // cause OffsetOutOfRangeException. To solve that, we cap the reading up to exposed position instead of the log - // end of the active segment. - val maxPosition = { - if (segmentEntry == segments.lastEntry) { - val exposedPos = nextOffsetMetadata.relativePositionInSegment.toLong - // Check the segment again in case a new segment has just rolled out. - if (segmentEntry != segments.lastEntry) + var segmentEntry = segments.floorEntry(startOffset) + + // return error on attempt to read beyond the log end offset or read below log start offset + if (startOffset > next || segmentEntry == null || startOffset < logStartOffset) + throw new OffsetOutOfRangeException("Request for offset %d but we only have log segments in the range %d to %d.".format(startOffset, logStartOffset, next)) + + // Do the read on the segment with a base offset less than the target offset + // but if that segment doesn't contain any messages with an offset greater than that + // continue to read from successive segments until we get some messages or we reach the end of the log + while (segmentEntry != null) { + val segment = segmentEntry.getValue + + // If the fetch occurs on the active segment, there might be a race condition where two fetch requests occur after + // the message is appended but before the nextOffsetMetadata is updated. In that case the second fetch may + // cause OffsetOutOfRangeException. To solve that, we cap the reading up to exposed position instead of the log + // end of the active segment. + val maxPosition = { + if (segmentEntry == segments.lastEntry) { + val exposedPos = nextOffsetMetadata.relativePositionInSegment.toLong + // Check the segment again in case a new segment has just rolled out. + if (segmentEntry != segments.lastEntry) // New log segment has rolled out, we can read up to the file end. + segment.size + else + exposedPos + } else { segment.size - else - exposedPos - } else { - segment.size + } } - } - val fetchInfo = segment.read(startOffset, maxOffset, maxLength, maxPosition, minOneMessage) - if (fetchInfo == null) { - segmentEntry = segments.higherEntry(segmentEntry.getKey) - } else { - return isolationLevel match { - case IsolationLevel.READ_UNCOMMITTED => fetchInfo - case IsolationLevel.READ_COMMITTED => addAbortedTransactions(startOffset, segmentEntry, fetchInfo) + val fetchInfo = segment.read(startOffset, maxOffset, maxLength, maxPosition, minOneMessage) + if (fetchInfo == null) { + segmentEntry = segments.higherEntry(segmentEntry.getKey) + } else { + return isolationLevel match { + case IsolationLevel.READ_UNCOMMITTED => fetchInfo + case IsolationLevel.READ_COMMITTED => addAbortedTransactions(startOffset, segmentEntry, fetchInfo) + } } } - } - // okay we are beyond the end of the last segment with no data fetched although the start offset is in range, - // this can happen when all messages with offset larger than start offsets have been deleted. - // In this case, we will return the empty set with log end offset metadata - FetchDataInfo(nextOffsetMetadata, MemoryRecords.EMPTY) + // okay we are beyond the end of the last segment with no data fetched although the start offset is in range, + // this can happen when all messages with offset larger than start offsets have been deleted. + // In this case, we will return the empty set with log end offset metadata + FetchDataInfo(nextOffsetMetadata, MemoryRecords.EMPTY) + } } private[log] def collectAbortedTransactions(startOffset: Long, upperBoundOffset: Long): List[AbortedTxn] = { @@ -1011,35 +1028,37 @@ class Log(@volatile var dir: File, * None if no such message is found. */ def fetchOffsetsByTimestamp(targetTimestamp: Long): Option[TimestampOffset] = { - debug(s"Searching offset for timestamp $targetTimestamp") + maybeHandleIOException(s"Error while fetching offset by timestamp for $topicPartition in dir ${dir.getParent}") { + debug(s"Searching offset for timestamp $targetTimestamp") - if (config.messageFormatVersion < KAFKA_0_10_0_IV0 && + if (config.messageFormatVersion < KAFKA_0_10_0_IV0 && targetTimestamp != ListOffsetRequest.EARLIEST_TIMESTAMP && targetTimestamp != ListOffsetRequest.LATEST_TIMESTAMP) - throw new UnsupportedForMessageFormatException(s"Cannot search offsets based on timestamp because message format version " + + throw new UnsupportedForMessageFormatException(s"Cannot search offsets based on timestamp because message format version " + s"for partition $topicPartition is ${config.messageFormatVersion} which is earlier than the minimum " + s"required version $KAFKA_0_10_0_IV0") - // Cache to avoid race conditions. `toBuffer` is faster than most alternatives and provides - // constant time access while being safe to use with concurrent collections unlike `toArray`. - val segmentsCopy = logSegments.toBuffer - // For the earliest and latest, we do not need to return the timestamp. - if (targetTimestamp == ListOffsetRequest.EARLIEST_TIMESTAMP) + // Cache to avoid race conditions. `toBuffer` is faster than most alternatives and provides + // constant time access while being safe to use with concurrent collections unlike `toArray`. + val segmentsCopy = logSegments.toBuffer + // For the earliest and latest, we do not need to return the timestamp. + if (targetTimestamp == ListOffsetRequest.EARLIEST_TIMESTAMP) return Some(TimestampOffset(RecordBatch.NO_TIMESTAMP, logStartOffset)) - else if (targetTimestamp == ListOffsetRequest.LATEST_TIMESTAMP) + else if (targetTimestamp == ListOffsetRequest.LATEST_TIMESTAMP) return Some(TimestampOffset(RecordBatch.NO_TIMESTAMP, logEndOffset)) - val targetSeg = { - // Get all the segments whose largest timestamp is smaller than target timestamp - val earlierSegs = segmentsCopy.takeWhile(_.largestTimestamp < targetTimestamp) - // We need to search the first segment whose largest timestamp is greater than the target timestamp if there is one. - if (earlierSegs.length < segmentsCopy.length) - Some(segmentsCopy(earlierSegs.length)) - else - None - } + val targetSeg = { + // Get all the segments whose largest timestamp is smaller than target timestamp + val earlierSegs = segmentsCopy.takeWhile(_.largestTimestamp < targetTimestamp) + // We need to search the first segment whose largest timestamp is greater than the target timestamp if there is one. + if (earlierSegs.length < segmentsCopy.length) + Some(segmentsCopy(earlierSegs.length)) + else + None + } - targetSeg.flatMap(_.findOffsetByTimestamp(targetTimestamp, logStartOffset)) + targetSeg.flatMap(_.findOffsetByTimestamp(targetTimestamp, logStartOffset)) + } } /** @@ -1070,18 +1089,20 @@ class Log(@volatile var dir: File, } private def deleteSegments(deletable: Iterable[LogSegment]): Int = { - val numToDelete = deletable.size - if (numToDelete > 0) { - // we must always have at least one segment, so if we are going to delete all the segments, create a new one first - if (segments.size == numToDelete) - roll() - lock synchronized { - // remove the segments for lookups - deletable.foreach(deleteSegment) - maybeIncrementLogStartOffset(segments.firstEntry.getValue.baseOffset) + maybeHandleIOException(s"Error while deleting segments for $topicPartition in dir ${dir.getParent}") { + val numToDelete = deletable.size + if (numToDelete > 0) { + // we must always have at least one segment, so if we are going to delete all the segments, create a new one first + if (segments.size == numToDelete) + roll() + lock synchronized { + // remove the segments for lookups + deletable.foreach(deleteSegment) + maybeIncrementLogStartOffset(segments.firstEntry.getValue.baseOffset) + } } + numToDelete } - numToDelete } /** @@ -1204,58 +1225,60 @@ class Log(@volatile var dir: File, * @return The newly rolled segment */ def roll(expectedNextOffset: Long = 0): LogSegment = { - val start = time.nanoseconds - lock synchronized { - val newOffset = math.max(expectedNextOffset, logEndOffset) - val logFile = Log.logFile(dir, newOffset) - val offsetIdxFile = offsetIndexFile(dir, newOffset) - val timeIdxFile = timeIndexFile(dir, newOffset) - val txnIdxFile = transactionIndexFile(dir, newOffset) - for(file <- List(logFile, offsetIdxFile, timeIdxFile, txnIdxFile) if file.exists) { - warn("Newly rolled segment file " + file.getName + " already exists; deleting it first") - file.delete() - } - - segments.lastEntry() match { - case null => - case entry => { - val seg = entry.getValue - seg.onBecomeInactiveSegment() - seg.index.trimToValidSize() - seg.timeIndex.trimToValidSize() - seg.log.trim() + maybeHandleIOException(s"Error while rolling log segment for $topicPartition in dir ${dir.getParent}") { + val start = time.nanoseconds + lock synchronized { + val newOffset = math.max(expectedNextOffset, logEndOffset) + val logFile = Log.logFile(dir, newOffset) + val offsetIdxFile = offsetIndexFile(dir, newOffset) + val timeIdxFile = timeIndexFile(dir, newOffset) + val txnIdxFile = transactionIndexFile(dir, newOffset) + for (file <- List(logFile, offsetIdxFile, timeIdxFile, txnIdxFile) if file.exists) { + warn("Newly rolled segment file " + file.getName + " already exists; deleting it first") + file.delete() } - } - // take a snapshot of the producer state to facilitate recovery. It is useful to have the snapshot - // offset align with the new segment offset since this ensures we can recover the segment by beginning - // with the corresponding snapshot file and scanning the segment data. Because the segment base offset - // may actually be ahead of the current producer state end offset (which corresponds to the log end offset), - // we manually override the state offset here prior to taking the snapshot. - producerStateManager.updateMapEndOffset(newOffset) - producerStateManager.takeSnapshot() + segments.lastEntry() match { + case null => + case entry => { + val seg = entry.getValue + seg.onBecomeInactiveSegment() + seg.index.trimToValidSize() + seg.timeIndex.trimToValidSize() + seg.log.trim() + } + } - val segment = new LogSegment(dir, - startOffset = newOffset, - indexIntervalBytes = config.indexInterval, - maxIndexSize = config.maxIndexSize, - rollJitterMs = config.randomSegmentJitter, - time = time, - fileAlreadyExists = false, - initFileSize = initFileSize, - preallocate = config.preallocate) - val prev = addSegment(segment) - if(prev != null) - throw new KafkaException("Trying to roll a new log segment for topic partition %s with start offset %d while it already exists.".format(name, newOffset)) - // We need to update the segment base offset and append position data of the metadata when log rolls. - // The next offset should not change. - updateLogEndOffset(nextOffsetMetadata.messageOffset) - // schedule an asynchronous flush of the old segment - scheduler.schedule("flush-log", () => flush(newOffset), delay = 0L) - - info("Rolled new log segment for '" + name + "' in %.0f ms.".format((System.nanoTime - start) / (1000.0*1000.0))) + // take a snapshot of the producer state to facilitate recovery. It is useful to have the snapshot + // offset align with the new segment offset since this ensures we can recover the segment by beginning + // with the corresponding snapshot file and scanning the segment data. Because the segment base offset + // may actually be ahead of the current producer state end offset (which corresponds to the log end offset), + // we manually override the state offset here prior to taking the snapshot. + producerStateManager.updateMapEndOffset(newOffset) + producerStateManager.takeSnapshot() - segment + val segment = new LogSegment(dir, + startOffset = newOffset, + indexIntervalBytes = config.indexInterval, + maxIndexSize = config.maxIndexSize, + rollJitterMs = config.randomSegmentJitter, + time = time, + fileAlreadyExists = false, + initFileSize = initFileSize, + preallocate = config.preallocate) + val prev = addSegment(segment) + if (prev != null) + throw new KafkaException("Trying to roll a new log segment for topic partition %s with start offset %d while it already exists.".format(name, newOffset)) + // We need to update the segment base offset and append position data of the metadata when log rolls. + // The next offset should not change. + updateLogEndOffset(nextOffsetMetadata.messageOffset) + // schedule an asynchronous flush of the old segment + scheduler.schedule("flush-log", () => flush(newOffset), delay = 0L) + + info("Rolled new log segment for '" + name + "' in %.0f ms.".format((System.nanoTime - start) / (1000.0 * 1000.0))) + + segment + } } } @@ -1275,22 +1298,24 @@ class Log(@volatile var dir: File, * @param offset The offset to flush up to (non-inclusive); the new recovery point */ def flush(offset: Long) : Unit = { - if (offset <= this.recoveryPoint) - return - debug("Flushing log '" + name + " up to offset " + offset + ", last flushed: " + lastFlushTime + " current time: " + - time.milliseconds + " unflushed = " + unflushedMessages) - for(segment <- logSegments(this.recoveryPoint, offset)) - segment.flush() + maybeHandleIOException(s"Error while flushing log for $topicPartition in dir ${dir.getParent} with offset $offset") { + if (offset <= this.recoveryPoint) + return + debug("Flushing log '" + name + " up to offset " + offset + ", last flushed: " + lastFlushTime + " current time: " + + time.milliseconds + " unflushed = " + unflushedMessages) + for (segment <- logSegments(this.recoveryPoint, offset)) + segment.flush() - // now that we have flushed, we can cleanup old producer snapshots. However, it is useful to retain - // the snapshots from the recent segments in case we need to truncate and rebuild the producer state. - // Otherwise, we would always need to rebuild from the earliest segment. - producerStateManager.deleteSnapshotsBefore(minSnapshotOffsetToRetain(offset)) + // now that we have flushed, we can cleanup old producer snapshots. However, it is useful to retain + // the snapshots from the recent segments in case we need to truncate and rebuild the producer state. + // Otherwise, we would always need to rebuild from the earliest segment. + producerStateManager.deleteSnapshotsBefore(minSnapshotOffsetToRetain(offset)) - lock synchronized { - if(offset > this.recoveryPoint) { - this.recoveryPoint = offset - lastflushedTime.set(time.milliseconds) + lock synchronized { + if (offset > this.recoveryPoint) { + this.recoveryPoint = offset + lastflushedTime.set(time.milliseconds) + } } } } @@ -1310,11 +1335,13 @@ class Log(@volatile var dir: File, * Completely delete this log directory and all contents from the file system with no delay */ private[log] def delete() { - lock synchronized { - logSegments.foreach(_.delete()) - segments.clear() - leaderEpochCache.clear() - Utils.delete(dir) + maybeHandleIOException(s"Error while deleting log for $topicPartition in dir ${dir.getParent}") { + lock synchronized { + logSegments.foreach(_.delete()) + segments.clear() + leaderEpochCache.clear() + Utils.delete(dir) + } } } @@ -1344,25 +1371,27 @@ class Log(@volatile var dir: File, * @param targetOffset The offset to truncate to, an upper bound on all offsets in the log after truncation is complete. */ private[log] def truncateTo(targetOffset: Long) { - if(targetOffset < 0) - throw new IllegalArgumentException("Cannot truncate to a negative offset (%d).".format(targetOffset)) - if(targetOffset >= logEndOffset) { - info("Truncating %s to %d has no effect as the largest offset in the log is %d.".format(name, targetOffset, logEndOffset-1)) - return - } - info("Truncating log %s to offset %d.".format(name, targetOffset)) - lock synchronized { - if(segments.firstEntry.getValue.baseOffset > targetOffset) { - truncateFullyAndStartAt(targetOffset) - } else { - val deletable = logSegments.filter(segment => segment.baseOffset > targetOffset) - deletable.foreach(deleteSegment) - activeSegment.truncateTo(targetOffset) - updateLogEndOffset(targetOffset) - this.recoveryPoint = math.min(targetOffset, this.recoveryPoint) - this.logStartOffset = math.min(targetOffset, this.logStartOffset) - leaderEpochCache.clearAndFlushLatest(targetOffset) - loadProducerState(targetOffset, reloadFromCleanShutdown = false) + maybeHandleIOException(s"Error while truncating log to offset $targetOffset for $topicPartition in dir ${dir.getParent}") { + if (targetOffset < 0) + throw new IllegalArgumentException("Cannot truncate to a negative offset (%d).".format(targetOffset)) + if (targetOffset >= logEndOffset) { + info("Truncating %s to %d has no effect as the largest offset in the log is %d.".format(name, targetOffset, logEndOffset - 1)) + return + } + info("Truncating log %s to offset %d.".format(name, targetOffset)) + lock synchronized { + if (segments.firstEntry.getValue.baseOffset > targetOffset) { + truncateFullyAndStartAt(targetOffset) + } else { + val deletable = logSegments.filter(segment => segment.baseOffset > targetOffset) + deletable.foreach(deleteSegment) + activeSegment.truncateTo(targetOffset) + updateLogEndOffset(targetOffset) + this.recoveryPoint = math.min(targetOffset, this.recoveryPoint) + this.logStartOffset = math.min(targetOffset, this.logStartOffset) + leaderEpochCache.clearAndFlushLatest(targetOffset) + loadProducerState(targetOffset, reloadFromCleanShutdown = false) + } } } } @@ -1373,28 +1402,30 @@ class Log(@volatile var dir: File, * @param newOffset The new offset to start the log with */ private[log] def truncateFullyAndStartAt(newOffset: Long) { - debug(s"Truncate and start log '$name' at offset $newOffset") - lock synchronized { - val segmentsToDelete = logSegments.toList - segmentsToDelete.foreach(deleteSegment) - addSegment(new LogSegment(dir, - newOffset, - indexIntervalBytes = config.indexInterval, - maxIndexSize = config.maxIndexSize, - rollJitterMs = config.randomSegmentJitter, - time = time, - fileAlreadyExists = false, - initFileSize = initFileSize, - preallocate = config.preallocate)) - updateLogEndOffset(newOffset) - leaderEpochCache.clearAndFlush() - - producerStateManager.truncate() - producerStateManager.updateMapEndOffset(newOffset) - updateFirstUnstableOffset() + maybeHandleIOException(s"Error while truncating the entire log for $topicPartition in dir ${dir.getParent}") { + debug(s"Truncate and start log '$name' at offset $newOffset") + lock synchronized { + val segmentsToDelete = logSegments.toList + segmentsToDelete.foreach(deleteSegment) + addSegment(new LogSegment(dir, + newOffset, + indexIntervalBytes = config.indexInterval, + maxIndexSize = config.maxIndexSize, + rollJitterMs = config.randomSegmentJitter, + time = time, + fileAlreadyExists = false, + initFileSize = initFileSize, + preallocate = config.preallocate)) + updateLogEndOffset(newOffset) + leaderEpochCache.clearAndFlush() + + producerStateManager.truncate() + producerStateManager.updateMapEndOffset(newOffset) + updateFirstUnstableOffset() - this.recoveryPoint = math.min(newOffset, this.recoveryPoint) - this.logStartOffset = newOffset + this.recoveryPoint = math.min(newOffset, this.recoveryPoint) + this.logStartOffset = newOffset + } } } @@ -1439,6 +1470,9 @@ class Log(@volatile var dir: File, * This allows reads to happen concurrently without synchronization and without the possibility of physically * deleting a file while it is being read from. * + * This method does not need to convert IOException to KafkaStorageException because it is either called before all logs are loaded + * or the immediate caller will catch and handle IOException + * * @param segment The log segment to schedule for deletion */ private def deleteSegment(segment: LogSegment) { @@ -1452,13 +1486,18 @@ class Log(@volatile var dir: File, /** * Perform an asynchronous delete on the given file if it exists (otherwise do nothing) * - * @throws KafkaStorageException if the file can't be renamed and still exists + * This method does not need to convert IOException (thrown from changeFileSuffixes) to KafkaStorageException because + * it is either called before all logs are loaded or the caller will catch and handle IOException + * + * @throws IOException if the file can't be renamed and still exists */ private def asyncDeleteSegment(segment: LogSegment) { segment.changeFileSuffixes("", Log.DeletedFileSuffix) def deleteSeg() { info("Deleting segment %d from log %s.".format(segment.baseOffset, name)) - segment.delete() + maybeHandleIOException(s"Error while deleting segments for $topicPartition in dir ${dir.getParent}") { + segment.delete() + } } scheduler.schedule("delete-file", deleteSeg _, delay = config.fileDeleteDelayMs) } @@ -1467,6 +1506,9 @@ class Log(@volatile var dir: File, * Swap a new segment in place and delete one or more existing segments in a crash-safe manner. The old segments will * be asynchronously deleted. * + * This method does not need to convert IOException to KafkaStorageException because it is either called before all logs are loaded + * or the caller will catch and handle IOException + * * The sequence of operations is: * <ol> * <li> Cleaner creates new segment with suffix .cleaned and invokes replaceSegments(). @@ -1524,6 +1566,16 @@ class Log(@volatile var dir: File, */ def addSegment(segment: LogSegment) = this.segments.put(segment.baseOffset, segment) + private def maybeHandleIOException[T](msg: => String)(fun: => T): T = { + try { + fun + } catch { + case e: IOException => + logDirFailureChannel.maybeAddLogFailureEvent(dir.getParent) + throw new KafkaStorageException(msg, e) + } + } + } /** @@ -1574,11 +1626,12 @@ object Log { brokerTopicStats: BrokerTopicStats, time: Time = Time.SYSTEM, maxProducerIdExpirationMs: Int = 60 * 60 * 1000, - producerIdExpirationCheckIntervalMs: Int = 10 * 60 * 1000): Log = { + producerIdExpirationCheckIntervalMs: Int = 10 * 60 * 1000, + logDirFailureChannel: LogDirFailureChannel = null): Log = { val topicPartition = Log.parseTopicPartitionName(dir) val producerStateManager = new ProducerStateManager(topicPartition, dir, maxProducerIdExpirationMs) new Log(dir, config, logStartOffset, recoveryPoint, scheduler, brokerTopicStats, time, maxProducerIdExpirationMs, - producerIdExpirationCheckIntervalMs, topicPartition, producerStateManager) + producerIdExpirationCheckIntervalMs, topicPartition, producerStateManager, logDirFailureChannel) } /**