KAFKA-901 follow up changes to fix update metadata response handling and 
request logging


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/2d40ca30
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/2d40ca30
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/2d40ca30

Branch: refs/heads/trunk
Commit: 2d40ca30d34b2d4de35d5b811858b0f915451a19
Parents: eff5933
Author: Neha Narkhede <nehanarkh...@apache.org>
Authored: Tue May 21 10:01:51 2013 -0700
Committer: Neha Narkhede <nehanarkh...@apache.org>
Committed: Tue May 21 10:01:51 2013 -0700

----------------------------------------------------------------------
 .../kafka/controller/ControllerChannelManager.scala  | 15 +++++++++++++--
 core/src/main/scala/kafka/server/KafkaApis.scala     |  8 ++++++--
 2 files changed, 19 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/2d40ca30/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala 
b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
index 7e8ae29..0c41d1d 100644
--- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
+++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
@@ -130,6 +130,8 @@ class RequestSendThread(val controllerId: Int,
             response = LeaderAndIsrResponse.readFrom(receive.buffer)
           case RequestKeys.StopReplicaKey =>
             response = StopReplicaResponse.readFrom(receive.buffer)
+          case RequestKeys.UpdateMetadataKey =>
+            response = UpdateMetadataResponse.readFrom(receive.buffer)
         }
         stateChangeLogger.trace("Controller %d epoch %d received response 
correlationId %d for a request sent to broker %d"
                                   .format(controllerId, 
controllerContext.epoch, response.correlationId, toBrokerId))
@@ -157,9 +159,18 @@ class ControllerBrokerRequestBatch(controllerContext: 
ControllerContext, sendReq
 
   def newBatch() {
     // raise error if the previous batch is not empty
-    if(leaderAndIsrRequestMap.size > 0 || stopReplicaRequestMap.size > 0)
+    if(leaderAndIsrRequestMap.size > 0)
       throw new IllegalStateException("Controller to broker state change 
requests batch is not empty while creating " +
-        "a new one. Some state changes %s might be lost 
".format(leaderAndIsrRequestMap.toString()))
+        "a new one. Some LeaderAndIsr state changes %s might be lost 
".format(leaderAndIsrRequestMap.toString()))
+    if(stopReplicaRequestMap.size > 0)
+      throw new IllegalStateException("Controller to broker state change 
requests batch is not empty while creating a " +
+        "new one. Some StopReplica state changes %s might be lost 
".format(stopReplicaRequestMap.toString()))
+    if(updateMetadataRequestMap.size > 0)
+      throw new IllegalStateException("Controller to broker state change 
requests batch is not empty while creating a " +
+        "new one. Some UpdateMetadata state changes %s might be lost 
".format(updateMetadataRequestMap.toString()))
+    if(stopAndDeleteReplicaRequestMap.size > 0)
+      throw new IllegalStateException("Controller to broker state change 
requests batch is not empty while creating a " +
+        "new one. Some StopReplica with delete state changes %s might be lost 
".format(stopAndDeleteReplicaRequestMap.toString()))
     leaderAndIsrRequestMap.clear()
     stopReplicaRequestMap.clear()
     updateMetadataRequestMap.clear()

http://git-wip-us.apache.org/repos/asf/kafka/blob/2d40ca30/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala 
b/core/src/main/scala/kafka/server/KafkaApis.scala
index 0c5c4d5..93e2f04 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -101,12 +101,13 @@ class KafkaApis(val requestChannel: RequestChannel,
 
   def handleUpdateMetadataRequest(request: RequestChannel.Request) {
     val updateMetadataRequest = 
request.requestObj.asInstanceOf[UpdateMetadataRequest]
+    val stateChangeLogger = replicaManager.stateChangeLogger
     if(updateMetadataRequest.controllerEpoch < replicaManager.controllerEpoch) 
{
       val stateControllerEpochErrorMessage = ("Broker %d received update 
metadata request with correlation id %d from an " +
         "old controller %d with epoch %d. Latest known controller epoch is 
%d").format(brokerId,
         updateMetadataRequest.correlationId, 
updateMetadataRequest.controllerId, updateMetadataRequest.controllerEpoch,
         replicaManager.controllerEpoch)
-      replicaManager.stateChangeLogger.warn(stateControllerEpochErrorMessage)
+      stateChangeLogger.warn(stateControllerEpochErrorMessage)
       throw new ControllerMovedException(stateControllerEpochErrorMessage)
     }
     partitionMetadataLock synchronized {
@@ -115,7 +116,10 @@ class KafkaApis(val requestChannel: RequestChannel,
       updateMetadataRequest.aliveBrokers.foreach(b => aliveBrokers.put(b.id, 
b))
       updateMetadataRequest.partitionStateInfos.foreach { partitionState =>
         leaderCache.put(partitionState._1, partitionState._2)
-        debug("Caching leader info %s for partition 
%s".format(partitionState._2, partitionState._1))
+        if(stateChangeLogger.isTraceEnabled)
+          stateChangeLogger.trace(("Broker %d cached leader info %s for 
partition %s in response to UpdateMetadata request " +
+            "sent by controller %d epoch %d with correlation id 
%d").format(brokerId, partitionState._2, partitionState._1,
+            updateMetadataRequest.controllerId, 
updateMetadataRequest.controllerEpoch, updateMetadataRequest.correlationId))
       }
     }
     val updateMetadataResponse = new 
UpdateMetadataResponse(updateMetadataRequest.correlationId)

Reply via email to