Repository: kafka
Updated Branches:
  refs/heads/trunk cbdcd5f10 -> f20e5108a


kafka-1738; Partitions for topic not created after restart from forced 
shutdown; patched by Jun Rao; reviewed by Neha Narkhede


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/f20e5108
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/f20e5108
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/f20e5108

Branch: refs/heads/trunk
Commit: f20e5108a271e1fc37ca6752773efa74e2fef67f
Parents: cbdcd5f
Author: Jun Rao <[email protected]>
Authored: Fri Nov 7 10:46:21 2014 -0800
Committer: Jun Rao <[email protected]>
Committed: Fri Nov 7 10:46:21 2014 -0800

----------------------------------------------------------------------
 .../main/scala/kafka/controller/ControllerChannelManager.scala | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/f20e5108/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala 
b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
index ecbfa0f..eb492f0 100644
--- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
+++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
@@ -130,10 +130,11 @@ class RequestSendThread(val controllerId: Int,
           // removeBroker which will invoke shutdown() on this thread. At that 
point, we will stop retrying.
           try {
             channel.send(request)
+            receive = channel.receive()
             isSendSuccessful = true
           } catch {
             case e: Throwable => // if the send was not successful, reconnect 
to broker and resend the message
-              error(("Controller %d epoch %d failed to send request %s to 
broker %s. " +
+              warn(("Controller %d epoch %d fails to send request %s to broker 
%s. " +
                 "Reconnecting to broker.").format(controllerId, 
controllerContext.epoch,
                 request.toString, toBroker.toString()), e)
               channel.disconnect()
@@ -143,7 +144,6 @@ class RequestSendThread(val controllerId: Int,
               Utils.swallow(Thread.sleep(300))
           }
         }
-        receive = channel.receive()
         var response: RequestOrResponse = null
         request.requestId.get match {
           case RequestKeys.LeaderAndIsrKey =>
@@ -162,7 +162,7 @@ class RequestSendThread(val controllerId: Int,
       }
     } catch {
       case e: Throwable =>
-        warn("Controller %d fails to send a request to broker 
%s".format(controllerId, toBroker.toString()), e)
+        error("Controller %d fails to send a request to broker 
%s".format(controllerId, toBroker.toString()), e)
         // If there is any socket error (eg, socket timeout), the channel is 
no longer usable and needs to be recreated.
         channel.disconnect()
     }

Reply via email to