Updated Branches: refs/heads/trunk 8959690e9 -> bfc4ba499
KAFKA-955 (followup patch) After a leader change, messages sent with ack=0 are lost; reviewed by Neha Narkhede and Jun Rao Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/0c1885b8 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/0c1885b8 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/0c1885b8 Branch: refs/heads/trunk Commit: 0c1885b800077e4d360935a6d91fe1068a684560 Parents: c12d2ea Author: Guozhang Wang <[email protected]> Authored: Fri Sep 13 16:17:55 2013 -0700 Committer: Jun Rao <[email protected]> Committed: Fri Sep 13 16:17:55 2013 -0700 ---------------------------------------------------------------------- core/src/main/scala/kafka/api/ProducerRequest.scala | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/0c1885b8/core/src/main/scala/kafka/api/ProducerRequest.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/api/ProducerRequest.scala b/core/src/main/scala/kafka/api/ProducerRequest.scala index fda3e39..c606351 100644 --- a/core/src/main/scala/kafka/api/ProducerRequest.scala +++ b/core/src/main/scala/kafka/api/ProducerRequest.scala @@ -135,12 +135,17 @@ case class ProducerRequest(versionId: Short = ProducerRequest.CurrentVersion, } override def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request): Unit = { - val producerResponseStatus = data.map { - case (topicAndPartition, data) => - (topicAndPartition, ProducerResponseStatus(ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]), -1l)) + if(request.requestObj.asInstanceOf[ProducerRequest].requiredAcks == 0) { + requestChannel.closeConnection(request.processor, request) + } + else { + val producerResponseStatus = data.map { + case (topicAndPartition, data) => + (topicAndPartition, ProducerResponseStatus(ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]), -1l)) + } + val errorResponse = ProducerResponse(correlationId, producerResponseStatus) + requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(errorResponse))) } - val errorResponse = ProducerResponse(correlationId, producerResponseStatus) - requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(errorResponse))) } def emptyData(){
