Updated Branches:
  refs/heads/trunk 8959690e9 -> bfc4ba499

KAFKA-955 (followup patch) After a leader change, messages sent with ack=0 are 
lost; reviewed by Neha Narkhede and Jun Rao


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/0c1885b8
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/0c1885b8
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/0c1885b8

Branch: refs/heads/trunk
Commit: 0c1885b800077e4d360935a6d91fe1068a684560
Parents: c12d2ea
Author: Guozhang Wang <[email protected]>
Authored: Fri Sep 13 16:17:55 2013 -0700
Committer: Jun Rao <[email protected]>
Committed: Fri Sep 13 16:17:55 2013 -0700

----------------------------------------------------------------------
 core/src/main/scala/kafka/api/ProducerRequest.scala | 15 ++++++++++-----
 1 file changed, 10 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/0c1885b8/core/src/main/scala/kafka/api/ProducerRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/ProducerRequest.scala 
b/core/src/main/scala/kafka/api/ProducerRequest.scala
index fda3e39..c606351 100644
--- a/core/src/main/scala/kafka/api/ProducerRequest.scala
+++ b/core/src/main/scala/kafka/api/ProducerRequest.scala
@@ -135,12 +135,17 @@ case class ProducerRequest(versionId: Short = 
ProducerRequest.CurrentVersion,
   }
 
   override  def handleError(e: Throwable, requestChannel: RequestChannel, 
request: RequestChannel.Request): Unit = {
-    val producerResponseStatus = data.map {
-      case (topicAndPartition, data) =>
-        (topicAndPartition, 
ProducerResponseStatus(ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]),
 -1l))
+    if(request.requestObj.asInstanceOf[ProducerRequest].requiredAcks == 0) {
+        requestChannel.closeConnection(request.processor, request)
+    }
+    else {
+      val producerResponseStatus = data.map {
+        case (topicAndPartition, data) =>
+          (topicAndPartition, 
ProducerResponseStatus(ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]),
 -1l))
+      }
+      val errorResponse = ProducerResponse(correlationId, 
producerResponseStatus)
+      requestChannel.sendResponse(new Response(request, new 
BoundedByteBufferSend(errorResponse)))
     }
-    val errorResponse = ProducerResponse(correlationId, producerResponseStatus)
-    requestChannel.sendResponse(new Response(request, new 
BoundedByteBufferSend(errorResponse)))
   }
 
   def emptyData(){

Reply via email to