Hi Jaikiran, This is a real bug, could you file a JIRA?
As for the fix, I think your proposal would be the right way to fix it. " Guozhang On Mon, Jan 19, 2015 at 9:07 AM, Jaikiran Pai <jai.forums2...@gmail.com> wrote: > I often see the following exception while running some tests > (ProducerFailureHandlingTest.testNoResponse is one such instance): > > > [2015-01-19 22:30:24,257] ERROR [Controller-0-to-broker-1-send-thread], > Controller 0 fails to send a request to broker > id:1,host:localhost,port:56729 (kafka.controller.RequestSendThread:103) > java.lang.NullPointerException > at kafka.controller.RequestSendThread.doWork(ControllerChannelManager. > scala:150) > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60) > > > Looking at that code in question, I can see that the NPE can be trigger > when the "receive" is null which can happen if the "isRunning" is false > (i.e a shutdown has been requested). The fix to prevent this seems > straightforward: > > diff --git > a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala > b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala > index eb492f0..10f4c5a 100644 > --- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala > +++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala > @@ -144,20 +144,22 @@ class RequestSendThread(val controllerId: Int, > Utils.swallow(Thread.sleep(300)) > } > } > - var response: RequestOrResponse = null > - request.requestId.get match { > - case RequestKeys.LeaderAndIsrKey => > - response = LeaderAndIsrResponse.readFrom(receive.buffer) > - case RequestKeys.StopReplicaKey => > - response = StopReplicaResponse.readFrom(receive.buffer) > - case RequestKeys.UpdateMetadataKey => > - response = UpdateMetadataResponse.readFrom(receive.buffer) > - } > - stateChangeLogger.trace("Controller %d epoch %d received > response %s for a request sent to broker %s" > - .format(controllerId, > controllerContext.epoch, response.toString, toBroker.toString)) > + if (receive != null) { > + var response: RequestOrResponse = null > + request.requestId.get match { > + case RequestKeys.LeaderAndIsrKey => > + response = LeaderAndIsrResponse.readFrom(receive.buffer) > + case RequestKeys.StopReplicaKey => > + response = StopReplicaResponse.readFrom(receive.buffer) > + case RequestKeys.UpdateMetadataKey => > + response = UpdateMetadataResponse.readFrom(receive.buffer) > + } > + stateChangeLogger.trace("Controller %d epoch %d received > response %s for a request sent to broker %s" > + .format(controllerId, controllerContext.epoch, > response.toString, toBroker.toString)) > > - if(callback != null) { > - callback(response) > + if (callback != null) { > + callback(response) > + } > } > } > > > However can this really be considered a fix or would this just be hiding > the real issue and would there be something more that will have to be done > in this case? I'm on trunk FWIW. > > > -Jaikiran > -- -- Guozhang