Repository: kafka Updated Branches: refs/heads/trunk 65edd64ca -> 96959bc56
KAFKA-5250: Do fetch down conversion after throttling Perform down conversion after throttling to avoid retaining messages in memory during throttling since this could result in OOM. Also update bytesOut metrics after throttling. Author: Rajini Sivaram <rajinisiva...@googlemail.com> Reviewers: Ismael Juma <ism...@juma.me.uk> Closes #3068 from rajinisivaram/KAFKA-5250 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/96959bc5 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/96959bc5 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/96959bc5 Branch: refs/heads/trunk Commit: 96959bc5620b6dc5900173a15cccfe83d20be944 Parents: 65edd64 Author: Rajini Sivaram <rajinisiva...@googlemail.com> Authored: Tue May 16 08:49:21 2017 -0400 Committer: Ismael Juma <ism...@juma.me.uk> Committed: Thu May 18 22:32:18 2017 +0100 ---------------------------------------------------------------------- .../src/main/scala/kafka/server/KafkaApis.scala | 87 ++++++++++++-------- 1 file changed, 51 insertions(+), 36 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/96959bc5/core/src/main/scala/kafka/server/KafkaApis.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 02a1103..db15f72 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -492,37 +492,41 @@ class KafkaApis(val requestChannel: RequestChannel, FetchResponse.INVALID_HIGHWATERMARK, FetchResponse.INVALID_LAST_STABLE_OFFSET, FetchResponse.INVALID_LOG_START_OFFSET, null, MemoryRecords.EMPTY)) } - // the callback for sending a fetch response - def sendResponseCallback(responsePartitionData: Seq[(TopicPartition, FetchPartitionData)]) { - val convertedPartitionData = { - responsePartitionData.map { case (tp, data) => - - // Down-conversion of the fetched records is needed when the stored magic version is - // greater than that supported by the client (as indicated by the fetch request version). If the - // configured magic version for the topic is less than or equal to that supported by the version of the - // fetch request, we skip the iteration through the records in order to check the magic version since we - // know it must be supported. However, if the magic version is changed from a higher version back to a - // lower version, this check will no longer be valid and we will fail to down-convert the messages - // which were written in the new format prior to the version downgrade. - val convertedData = replicaManager.getMagic(tp) match { - case Some(magic) if magic > 0 && versionId <= 1 && !data.records.hasCompatibleMagic(RecordBatch.MAGIC_VALUE_V0) => - trace(s"Down converting message to V0 for fetch request from $clientId") - FetchPartitionData(data.error, data.hw, data.logStartOffset, data.records.downConvert(RecordBatch.MAGIC_VALUE_V0)) - - case Some(magic) if magic > 1 && versionId <= 3 && !data.records.hasCompatibleMagic(RecordBatch.MAGIC_VALUE_V1) => - trace(s"Down converting message to V1 for fetch request from $clientId") - FetchPartitionData(data.error, data.hw, data.logStartOffset, data.records.downConvert(RecordBatch.MAGIC_VALUE_V1)) - - case _ => data - } + def convertedPartitionData(tp: TopicPartition, data: FetchResponse.PartitionData) = { + + // Down-conversion of the fetched records is needed when the stored magic version is + // greater than that supported by the client (as indicated by the fetch request version). If the + // configured magic version for the topic is less than or equal to that supported by the version of the + // fetch request, we skip the iteration through the records in order to check the magic version since we + // know it must be supported. However, if the magic version is changed from a higher version back to a + // lower version, this check will no longer be valid and we will fail to down-convert the messages + // which were written in the new format prior to the version downgrade. + replicaManager.getMagic(tp) match { + case Some(magic) if magic > 0 && versionId <= 1 && !data.records.hasCompatibleMagic(RecordBatch.MAGIC_VALUE_V0) => + trace(s"Down converting message to V0 for fetch request from $clientId") + new FetchResponse.PartitionData(data.error, data.highWatermark, FetchResponse.INVALID_LAST_STABLE_OFFSET, + data.logStartOffset, data.abortedTransactions, data.records.downConvert(RecordBatch.MAGIC_VALUE_V0)) + + case Some(magic) if magic > 1 && versionId <= 3 && !data.records.hasCompatibleMagic(RecordBatch.MAGIC_VALUE_V1) => + trace(s"Down converting message to V1 for fetch request from $clientId") + new FetchResponse.PartitionData(data.error, data.highWatermark, FetchResponse.INVALID_LAST_STABLE_OFFSET, + data.logStartOffset, data.abortedTransactions, data.records.downConvert(RecordBatch.MAGIC_VALUE_V1)) + + case _ => data + } + } - val abortedTransactions = convertedData.abortedTransactions.map(_.asJava).orNull - tp -> new FetchResponse.PartitionData(convertedData.error, convertedData.hw, FetchResponse.INVALID_LAST_STABLE_OFFSET, - convertedData.logStartOffset, abortedTransactions, convertedData.records) + // the callback for process a fetch response, invoked before throttling + def processResponseCallback(responsePartitionData: Seq[(TopicPartition, FetchPartitionData)]) { + val partitionData = { + responsePartitionData.map { case (tp, data) => + val abortedTransactions = data.abortedTransactions.map(_.asJava).orNull + tp -> new FetchResponse.PartitionData(data.error, data.hw, FetchResponse.INVALID_LAST_STABLE_OFFSET, + data.logStartOffset, abortedTransactions, data.records) } } - val mergedPartitionData = convertedPartitionData ++ unauthorizedForReadPartitionData ++ nonExistingOrUnauthorizedForDescribePartitionData + val mergedPartitionData = partitionData ++ unauthorizedForReadPartitionData ++ nonExistingOrUnauthorizedForDescribePartitionData val fetchedPartitionData = new util.LinkedHashMap[TopicPartition, FetchResponse.PartitionData]() @@ -532,17 +536,22 @@ class KafkaApis(val requestChannel: RequestChannel, s"on partition $topicPartition failed due to ${data.error.exceptionName}") fetchedPartitionData.put(topicPartition, data) - - // record the bytes out metrics only when the response is being sent - brokerTopicStats.updateBytesOut(topicPartition.topic, fetchRequest.isFromFollower, data.records.sizeInBytes) } - val response = new FetchResponse(fetchedPartitionData, 0) - val responseStruct = response.toStruct(versionId) - + // fetch response callback invoked after any throttling def fetchResponseCallback(bandwidthThrottleTimeMs: Int) { def createResponse(requestThrottleTimeMs: Int): RequestChannel.Response = { + val convertedData = new util.LinkedHashMap[TopicPartition, FetchResponse.PartitionData] + fetchedPartitionData.asScala.foreach(e => convertedData.put(e._1, convertedPartitionData(e._1, e._2))) + val response = new FetchResponse(convertedData, 0) + val responseStruct = response.toStruct(versionId) + trace(s"Sending fetch response to client $clientId of ${responseStruct.sizeOf} bytes.") + response.responseData.asScala.foreach { case (topicPartition, data) => + // record the bytes out metrics only when the response is being sent + brokerTopicStats.updateBytesOut(topicPartition.topic, fetchRequest.isFromFollower, data.records.sizeInBytes) + } + val responseSend = response.toSend(responseStruct, bandwidthThrottleTimeMs + requestThrottleTimeMs, request.connectionId, request.header) new RequestChannel.Response(request, responseSend) } @@ -555,7 +564,8 @@ class KafkaApis(val requestChannel: RequestChannel, sendResponseMaybeThrottle(request, request.header.clientId, sendResponseCallback) } - // When this callback is triggered, the remote API call has completed + // When this callback is triggered, the remote API call has completed. + // Record time before any byte-rate throttling. request.apiRemoteCompleteTimeNanos = time.nanoseconds if (fetchRequest.isFromFollower) { @@ -564,13 +574,18 @@ class KafkaApis(val requestChannel: RequestChannel, quotas.leader.record(responseSize) fetchResponseCallback(bandwidthThrottleTimeMs = 0) } else { + // Fetch size used to determine throttle time is calculated before any down conversions. + // This may be slightly different from the actual response size. But since down conversions + // result in data being loaded into memory, it is better to do this after throttling to avoid OOM. + val response = new FetchResponse(fetchedPartitionData, 0) + val responseStruct = response.toStruct(versionId) quotas.fetch.recordAndMaybeThrottle(request.session.sanitizedUser, clientId, responseStruct.sizeOf, fetchResponseCallback) } } if (authorizedRequestInfo.isEmpty) - sendResponseCallback(Seq.empty) + processResponseCallback(Seq.empty) else { // call the replica manager to fetch messages from the local replica replicaManager.fetchMessages( @@ -581,7 +596,7 @@ class KafkaApis(val requestChannel: RequestChannel, versionId <= 2, authorizedRequestInfo, replicationQuota(fetchRequest), - sendResponseCallback, + processResponseCallback, fetchRequest.isolationLevel) } }