Repository: kafka Updated Branches: refs/heads/0.9.0 077cabbbc -> 5f316a58a
KAFKA-3427; Broker should return correct version of FetchResponse on exception Merging the fix from: https://issues.apache.org/jira/browse/KAFKA-3427 The original version of the code, returned a response using V0 of the response protocol. This caused clients to break because they expected the throttle_time_ms field to be present. Author: Aditya Auradkar <[email protected]> Reviewers: Jun Rao <[email protected]>, Ismael Juma <[email protected]> Closes #1128 from auradkar/k-34 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/5f316a58 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/5f316a58 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/5f316a58 Branch: refs/heads/0.9.0 Commit: 5f316a58a1058ec230e8b4bfdfec1ce081589a20 Parents: 077cabb Author: Aditya Auradkar <[email protected]> Authored: Tue Apr 26 11:59:16 2016 -0700 Committer: Ismael Juma <[email protected]> Committed: Tue Apr 26 11:59:16 2016 -0700 ---------------------------------------------------------------------- core/src/main/scala/kafka/api/FetchRequest.scala | 4 +++- core/src/main/scala/kafka/api/ProducerRequest.scala | 5 +++-- 2 files changed, 6 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/5f316a58/core/src/main/scala/kafka/api/FetchRequest.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/api/FetchRequest.scala b/core/src/main/scala/kafka/api/FetchRequest.scala index 04ca157..191c627 100644 --- a/core/src/main/scala/kafka/api/FetchRequest.scala +++ b/core/src/main/scala/kafka/api/FetchRequest.scala @@ -148,7 +148,9 @@ case class FetchRequest(versionId: Short = FetchRequest.CurrentVersion, case (topicAndPartition, data) => (topicAndPartition, FetchResponsePartitionData(ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]), -1, MessageSet.Empty)) } - val errorResponse = FetchResponse(correlationId, fetchResponsePartitionData) + + val fetchRequest = request.requestObj.asInstanceOf[FetchRequest] + val errorResponse = FetchResponse(correlationId, fetchResponsePartitionData, fetchRequest.versionId) requestChannel.sendResponse(new RequestChannel.Response(request, new FetchResponseSend(request.connectionId, errorResponse))) } http://git-wip-us.apache.org/repos/asf/kafka/blob/5f316a58/core/src/main/scala/kafka/api/ProducerRequest.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/api/ProducerRequest.scala b/core/src/main/scala/kafka/api/ProducerRequest.scala index ce78f78..0c7510e 100644 --- a/core/src/main/scala/kafka/api/ProducerRequest.scala +++ b/core/src/main/scala/kafka/api/ProducerRequest.scala @@ -128,7 +128,8 @@ case class ProducerRequest(versionId: Short = ProducerRequest.CurrentVersion, } override def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request): Unit = { - if(request.requestObj.asInstanceOf[ProducerRequest].requiredAcks == 0) { + val produceRequest = request.requestObj.asInstanceOf[ProducerRequest] + if(produceRequest.requiredAcks == 0) { requestChannel.closeConnection(request.processor, request) } else { @@ -136,7 +137,7 @@ case class ProducerRequest(versionId: Short = ProducerRequest.CurrentVersion, case (topicAndPartition, data) => (topicAndPartition, ProducerResponseStatus(ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]), -1l)) } - val errorResponse = ProducerResponse(correlationId, producerResponseStatus) + val errorResponse = ProducerResponse(correlationId, producerResponseStatus, produceRequest.versionId) requestChannel.sendResponse(new Response(request, new RequestOrResponseSend(request.connectionId, errorResponse))) } }
