KAFKA-901 follow up changes to fix update metadata response handling and request logging
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/2d40ca30 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/2d40ca30 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/2d40ca30 Branch: refs/heads/trunk Commit: 2d40ca30d34b2d4de35d5b811858b0f915451a19 Parents: eff5933 Author: Neha Narkhede <nehanarkh...@apache.org> Authored: Tue May 21 10:01:51 2013 -0700 Committer: Neha Narkhede <nehanarkh...@apache.org> Committed: Tue May 21 10:01:51 2013 -0700 ---------------------------------------------------------------------- .../kafka/controller/ControllerChannelManager.scala | 15 +++++++++++++-- core/src/main/scala/kafka/server/KafkaApis.scala | 8 ++++++-- 2 files changed, 19 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/2d40ca30/core/src/main/scala/kafka/controller/ControllerChannelManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala index 7e8ae29..0c41d1d 100644 --- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala +++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala @@ -130,6 +130,8 @@ class RequestSendThread(val controllerId: Int, response = LeaderAndIsrResponse.readFrom(receive.buffer) case RequestKeys.StopReplicaKey => response = StopReplicaResponse.readFrom(receive.buffer) + case RequestKeys.UpdateMetadataKey => + response = UpdateMetadataResponse.readFrom(receive.buffer) } stateChangeLogger.trace("Controller %d epoch %d received response correlationId %d for a request sent to broker %d" .format(controllerId, controllerContext.epoch, response.correlationId, toBrokerId)) @@ -157,9 +159,18 @@ class ControllerBrokerRequestBatch(controllerContext: ControllerContext, sendReq def newBatch() { // raise error if the previous batch is not empty - if(leaderAndIsrRequestMap.size > 0 || stopReplicaRequestMap.size > 0) + if(leaderAndIsrRequestMap.size > 0) throw new IllegalStateException("Controller to broker state change requests batch is not empty while creating " + - "a new one. Some state changes %s might be lost ".format(leaderAndIsrRequestMap.toString())) + "a new one. Some LeaderAndIsr state changes %s might be lost ".format(leaderAndIsrRequestMap.toString())) + if(stopReplicaRequestMap.size > 0) + throw new IllegalStateException("Controller to broker state change requests batch is not empty while creating a " + + "new one. Some StopReplica state changes %s might be lost ".format(stopReplicaRequestMap.toString())) + if(updateMetadataRequestMap.size > 0) + throw new IllegalStateException("Controller to broker state change requests batch is not empty while creating a " + + "new one. Some UpdateMetadata state changes %s might be lost ".format(updateMetadataRequestMap.toString())) + if(stopAndDeleteReplicaRequestMap.size > 0) + throw new IllegalStateException("Controller to broker state change requests batch is not empty while creating a " + + "new one. Some StopReplica with delete state changes %s might be lost ".format(stopAndDeleteReplicaRequestMap.toString())) leaderAndIsrRequestMap.clear() stopReplicaRequestMap.clear() updateMetadataRequestMap.clear() http://git-wip-us.apache.org/repos/asf/kafka/blob/2d40ca30/core/src/main/scala/kafka/server/KafkaApis.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 0c5c4d5..93e2f04 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -101,12 +101,13 @@ class KafkaApis(val requestChannel: RequestChannel, def handleUpdateMetadataRequest(request: RequestChannel.Request) { val updateMetadataRequest = request.requestObj.asInstanceOf[UpdateMetadataRequest] + val stateChangeLogger = replicaManager.stateChangeLogger if(updateMetadataRequest.controllerEpoch < replicaManager.controllerEpoch) { val stateControllerEpochErrorMessage = ("Broker %d received update metadata request with correlation id %d from an " + "old controller %d with epoch %d. Latest known controller epoch is %d").format(brokerId, updateMetadataRequest.correlationId, updateMetadataRequest.controllerId, updateMetadataRequest.controllerEpoch, replicaManager.controllerEpoch) - replicaManager.stateChangeLogger.warn(stateControllerEpochErrorMessage) + stateChangeLogger.warn(stateControllerEpochErrorMessage) throw new ControllerMovedException(stateControllerEpochErrorMessage) } partitionMetadataLock synchronized { @@ -115,7 +116,10 @@ class KafkaApis(val requestChannel: RequestChannel, updateMetadataRequest.aliveBrokers.foreach(b => aliveBrokers.put(b.id, b)) updateMetadataRequest.partitionStateInfos.foreach { partitionState => leaderCache.put(partitionState._1, partitionState._2) - debug("Caching leader info %s for partition %s".format(partitionState._2, partitionState._1)) + if(stateChangeLogger.isTraceEnabled) + stateChangeLogger.trace(("Broker %d cached leader info %s for partition %s in response to UpdateMetadata request " + + "sent by controller %d epoch %d with correlation id %d").format(brokerId, partitionState._2, partitionState._1, + updateMetadataRequest.controllerId, updateMetadataRequest.controllerEpoch, updateMetadataRequest.correlationId)) } } val updateMetadataResponse = new UpdateMetadataResponse(updateMetadataRequest.correlationId)