KAFKA-3259 KAFKA-3253; KIP-31/KIP-32 Follow-up This PR includes a number of clean-ups: * Code style * Documentation wording improvements * Efficiency improvements
Author: Ismael Juma <ism...@juma.me.uk> Reviewers: Jun Rao <jun...@gmail.com> Closes #943 from ijuma/kafka-3259-kip-31-32-clean-ups Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/01aeea7c Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/01aeea7c Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/01aeea7c Branch: refs/heads/trunk Commit: 01aeea7c7bca34f1edce40116b7721335938b13b Parents: 1bfadda Author: Ismael Juma <ism...@juma.me.uk> Authored: Wed Feb 24 11:28:53 2016 -0800 Committer: Jun Rao <jun...@gmail.com> Committed: Wed Feb 24 11:28:53 2016 -0800 ---------------------------------------------------------------------- .../org/apache/kafka/common/record/Record.java | 2 +- .../kafka/common/record/TimestampType.java | 32 +-- .../org/apache/kafka/common/utils/Utils.java | 2 +- core/src/main/scala/kafka/api/ApiVersion.scala | 14 +- core/src/main/scala/kafka/common/LongRef.scala | 61 +++++ .../controller/ControllerChannelManager.scala | 2 +- .../kafka/coordinator/GroupCoordinator.scala | 8 +- .../javaapi/message/ByteBufferMessageSet.scala | 5 +- .../main/scala/kafka/log/FileMessageSet.scala | 25 +- core/src/main/scala/kafka/log/Log.scala | 65 ++--- core/src/main/scala/kafka/log/LogCleaner.scala | 62 ++--- core/src/main/scala/kafka/log/LogConfig.scala | 20 +- core/src/main/scala/kafka/log/LogManager.scala | 10 +- .../kafka/message/ByteBufferMessageSet.scala | 239 +++++++++---------- core/src/main/scala/kafka/message/Message.scala | 53 ++-- .../main/scala/kafka/message/MessageSet.scala | 20 +- .../scala/kafka/message/MessageWriter.scala | 2 +- .../main/scala/kafka/server/ConfigHandler.scala | 35 ++- .../src/main/scala/kafka/server/KafkaApis.scala | 71 +++--- .../main/scala/kafka/server/KafkaConfig.scala | 34 ++- .../main/scala/kafka/server/KafkaServer.scala | 12 +- .../kafka/server/ReplicaFetcherThread.scala | 4 +- .../scala/kafka/server/ReplicaManager.scala | 29 +-- .../scala/kafka/tools/SimpleConsumerShell.scala | 2 +- .../kafka/api/BaseConsumerTest.scala | 18 +- .../kafka/api/PlaintextConsumerTest.scala | 2 +- .../kafka/consumer/ConsumerIteratorTest.scala | 8 +- .../GroupCoordinatorResponseTest.scala | 2 +- .../test/scala/unit/kafka/log/CleanerTest.scala | 13 +- .../unit/kafka/log/FileMessageSetTest.scala | 4 +- .../scala/unit/kafka/log/LogConfigTest.scala | 22 +- .../scala/unit/kafka/log/LogManagerTest.scala | 1 - .../scala/unit/kafka/log/LogSegmentTest.scala | 9 +- .../src/test/scala/unit/kafka/log/LogTest.scala | 34 +-- .../message/ByteBufferMessageSetTest.scala | 136 ++++++----- .../unit/kafka/server/KafkaConfigTest.scala | 2 +- docs/upgrade.html | 44 ++-- 37 files changed, 575 insertions(+), 529 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/01aeea7c/clients/src/main/java/org/apache/kafka/common/record/Record.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/record/Record.java b/clients/src/main/java/org/apache/kafka/common/record/Record.java index 8390dc7..147ad86 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/Record.java +++ b/clients/src/main/java/org/apache/kafka/common/record/Record.java @@ -347,7 +347,7 @@ public final class Record { if (magic() == 0) return TimestampType.NO_TIMESTAMP_TYPE; else - return wrapperRecordTimestampType == null ? TimestampType.getTimestampType(attributes()) : wrapperRecordTimestampType; + return wrapperRecordTimestampType == null ? TimestampType.forAttributes(attributes()) : wrapperRecordTimestampType; } /** http://git-wip-us.apache.org/repos/asf/kafka/blob/01aeea7c/clients/src/main/java/org/apache/kafka/common/record/TimestampType.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/record/TimestampType.java b/clients/src/main/java/org/apache/kafka/common/record/TimestampType.java index ab12a35..62fd814 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/TimestampType.java +++ b/clients/src/main/java/org/apache/kafka/common/record/TimestampType.java @@ -25,34 +25,28 @@ import java.util.NoSuchElementException; public enum TimestampType { NO_TIMESTAMP_TYPE(-1, "NoTimestampType"), CREATE_TIME(0, "CreateTime"), LOG_APPEND_TIME(1, "LogAppendTime"); - public final int value; + public final int id; public final String name; - TimestampType(int value, String name) { - this.value = value; + TimestampType(int id, String name) { + this.id = id; this.name = name; } - public static TimestampType getTimestampType(byte attributes) { - int timestampType = (attributes & Record.TIMESTAMP_TYPE_MASK) >> Record.TIMESTAMP_TYPE_ATTRIBUTE_OFFSET; - return timestampType == 0 ? CREATE_TIME : LOG_APPEND_TIME; + public byte updateAttributes(byte attributes) { + return this == CREATE_TIME ? + (byte) (attributes & ~Record.TIMESTAMP_TYPE_MASK) : (byte) (attributes | Record.TIMESTAMP_TYPE_MASK); } - public static byte setTimestampType(byte attributes, TimestampType timestampType) { - return timestampType == CREATE_TIME ? - (byte) (attributes & ~Record.TIMESTAMP_TYPE_MASK) : (byte) (attributes | Record.TIMESTAMP_TYPE_MASK); + public static TimestampType forAttributes(byte attributes) { + int timestampType = (attributes & Record.TIMESTAMP_TYPE_MASK) >> Record.TIMESTAMP_TYPE_ATTRIBUTE_OFFSET; + return timestampType == 0 ? CREATE_TIME : LOG_APPEND_TIME; } public static TimestampType forName(String name) { - switch (name) { - case "NoTimestampType": - return NO_TIMESTAMP_TYPE; - case "CreateTime": - return CREATE_TIME; - case "LogAppendTime": - return LOG_APPEND_TIME; - default: - throw new NoSuchElementException("Invalid timestamp type " + name); - } + for (TimestampType t : values()) + if (t.name.equals(name)) + return t; + throw new NoSuchElementException("Invalid timestamp type " + name); } @Override http://git-wip-us.apache.org/repos/asf/kafka/blob/01aeea7c/clients/src/main/java/org/apache/kafka/common/utils/Utils.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java index 8df54a4..daef458 100755 --- a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java +++ b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java @@ -166,7 +166,7 @@ public class Utils { * @param buffer The buffer to write to * @param value The value to write */ - public static void writetUnsignedInt(ByteBuffer buffer, long value) { + public static void writeUnsignedInt(ByteBuffer buffer, long value) { buffer.putInt((int) (value & 0xffffffffL)); } http://git-wip-us.apache.org/repos/asf/kafka/blob/01aeea7c/core/src/main/scala/kafka/api/ApiVersion.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/api/ApiVersion.scala b/core/src/main/scala/kafka/api/ApiVersion.scala index 6b5fb7c..e2cadd1 100644 --- a/core/src/main/scala/kafka/api/ApiVersion.scala +++ b/core/src/main/scala/kafka/api/ApiVersion.scala @@ -51,9 +51,14 @@ object ApiVersion { "0.10.0" -> KAFKA_0_10_0_IV0 ) - def apply(version: String): ApiVersion = versionNameMap(version.split("\\.").slice(0,3).mkString(".")) + private val versionPattern = "\\.".r + + def apply(version: String): ApiVersion = + versionNameMap.getOrElse(versionPattern.split(version).slice(0, 3).mkString("."), + throw new IllegalArgumentException(s"Version `$version` is not a valid version")) def latestVersion = versionNameMap.values.max + } sealed trait ApiVersion extends Ordered[ApiVersion] { @@ -61,13 +66,8 @@ sealed trait ApiVersion extends Ordered[ApiVersion] { val messageFormatVersion: Byte val id: Int - override def compare(that: ApiVersion): Int = { + override def compare(that: ApiVersion): Int = ApiVersion.orderingByVersion.compare(this, that) - } - - def onOrAfter(that: ApiVersion): Boolean = { - this.compare(that) >= 0 - } override def toString(): String = version } http://git-wip-us.apache.org/repos/asf/kafka/blob/01aeea7c/core/src/main/scala/kafka/common/LongRef.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/common/LongRef.scala b/core/src/main/scala/kafka/common/LongRef.scala new file mode 100644 index 0000000..f2b1e32 --- /dev/null +++ b/core/src/main/scala/kafka/common/LongRef.scala @@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.common + +/** + * A mutable cell that holds a value of type `Long`. One should generally prefer using value-based programming (i.e. + * passing and returning `Long` values), but this class can be useful in some scenarios. + * + * Unlike `AtomicLong`, this class is not thread-safe and there are no atomicity guarantees. + */ +class LongRef(var value: Long) { + + def addAndGet(delta: Long): Long = { + value += delta + value + } + + def getAndAdd(delta: Long): Long = { + val result = value + value += delta + result + } + + def getAndIncrement(): Long = { + val v = value + value += 1 + v + } + + def incrementAndGet(): Long = { + value += 1 + value + } + + def getAndDecrement(): Long = { + val v = value + value -= 1 + v + } + + def decrementAndGet(): Long = { + value -= 1 + value + } + +} http://git-wip-us.apache.org/repos/asf/kafka/blob/01aeea7c/core/src/main/scala/kafka/controller/ControllerChannelManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala index 513f383..3b1a458 100755 --- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala +++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala @@ -380,7 +380,7 @@ class ControllerBrokerRequestBatch(controller: KafkaController) extends Logging topicPartition -> partitionState } - val version = if (controller.config.interBrokerProtocolVersion.onOrAfter(KAFKA_0_9_0)) (1: Short) else (0: Short) + val version = if (controller.config.interBrokerProtocolVersion >= KAFKA_0_9_0) (1: Short) else (0: Short) val updateMetadataRequest = if (version == 0) { http://git-wip-us.apache.org/repos/asf/kafka/blob/01aeea7c/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala b/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala index c86e87b..cb08358 100644 --- a/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala +++ b/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala @@ -724,10 +724,10 @@ object GroupCoordinator { // TODO: we store both group metadata and offset data here despite the topic name being offsets only val GroupMetadataTopicName = "__consumer_offsets" - def create(config: KafkaConfig, - zkUtils: ZkUtils, - replicaManager: ReplicaManager, - time: Time): GroupCoordinator = { + def apply(config: KafkaConfig, + zkUtils: ZkUtils, + replicaManager: ReplicaManager, + time: Time): GroupCoordinator = { val offsetConfig = OffsetConfig(maxMetadataSize = config.offsetMetadataMaxSize, loadBufferSize = config.offsetsLoadBufferSize, offsetsRetentionMs = config.offsetsRetentionMinutes * 60 * 1000L, http://git-wip-us.apache.org/repos/asf/kafka/blob/01aeea7c/core/src/main/scala/kafka/javaapi/message/ByteBufferMessageSet.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/javaapi/message/ByteBufferMessageSet.scala b/core/src/main/scala/kafka/javaapi/message/ByteBufferMessageSet.scala index df30279..590db83 100644 --- a/core/src/main/scala/kafka/javaapi/message/ByteBufferMessageSet.scala +++ b/core/src/main/scala/kafka/javaapi/message/ByteBufferMessageSet.scala @@ -16,8 +16,9 @@ */ package kafka.javaapi.message -import java.util.concurrent.atomic.AtomicLong import java.nio.ByteBuffer + +import kafka.common.LongRef import kafka.message._ import scala.collection.JavaConverters._ @@ -26,7 +27,7 @@ class ByteBufferMessageSet(val buffer: ByteBuffer) extends MessageSet { private val underlying: kafka.message.ByteBufferMessageSet = new kafka.message.ByteBufferMessageSet(buffer) def this(compressionCodec: CompressionCodec, messages: java.util.List[Message]) { - this(new kafka.message.ByteBufferMessageSet(compressionCodec, new AtomicLong(0), messages.asScala: _*).buffer) + this(new kafka.message.ByteBufferMessageSet(compressionCodec, new LongRef(0), messages.asScala: _*).buffer) } def this(messages: java.util.List[Message]) { http://git-wip-us.apache.org/repos/asf/kafka/blob/01aeea7c/core/src/main/scala/kafka/log/FileMessageSet.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/log/FileMessageSet.scala b/core/src/main/scala/kafka/log/FileMessageSet.scala index fe31ad4..45b3df9 100755 --- a/core/src/main/scala/kafka/log/FileMessageSet.scala +++ b/core/src/main/scala/kafka/log/FileMessageSet.scala @@ -174,16 +174,17 @@ class FileMessageSet private[kafka](@volatile var file: File, } /** - * This method is called before we write messages to socket use zero-copy transfer. We need to - * make sure all the messages in the message set has expected magic value + * This method is called before we write messages to the socket using zero-copy transfer. We need to + * make sure all the messages in the message set have the expected magic value. + * * @param expectedMagicValue the magic value expected - * @return true if all messages has expected magic value, false otherwise + * @return true if all messages have expected magic value, false otherwise */ - override def magicValueInAllWrapperMessages(expectedMagicValue: Byte): Boolean = { + override def isMagicValueInAllWrapperMessages(expectedMagicValue: Byte): Boolean = { var location = start val offsetAndSizeBuffer = ByteBuffer.allocate(MessageSet.LogOverhead) val crcAndMagicByteBuffer = ByteBuffer.allocate(Message.CrcLength + Message.MagicLength) - while(location < end) { + while (location < end) { offsetAndSizeBuffer.rewind() channel.read(offsetAndSizeBuffer, location) if (offsetAndSizeBuffer.hasRemaining) @@ -191,7 +192,7 @@ class FileMessageSet private[kafka](@volatile var file: File, offsetAndSizeBuffer.rewind() offsetAndSizeBuffer.getLong // skip offset field val messageSize = offsetAndSizeBuffer.getInt - if(messageSize < Message.MinMessageOverhead) + if (messageSize < Message.MinMessageOverhead) throw new IllegalStateException("Invalid message size: " + messageSize) crcAndMagicByteBuffer.rewind() channel.read(crcAndMagicByteBuffer, location + MessageSet.LogOverhead) @@ -203,15 +204,15 @@ class FileMessageSet private[kafka](@volatile var file: File, } /** - * Convert this message set to use specified message format. + * Convert this message set to use the specified message format. */ def toMessageFormat(toMagicValue: Byte): ByteBufferMessageSet = { val offsets = new ArrayBuffer[Long] val newMessages = new ArrayBuffer[Message] - this.iterator().foreach(messageAndOffset => { + this.foreach { messageAndOffset => val message = messageAndOffset.message if (message.compressionCodec == NoCompressionCodec) { - newMessages += messageAndOffset.message.toFormatVersion(toMagicValue) + newMessages += message.toFormatVersion(toMagicValue) offsets += messageAndOffset.offset } else { // File message set only has shallow iterator. We need to do deep iteration here if needed. @@ -221,19 +222,19 @@ class FileMessageSet private[kafka](@volatile var file: File, offsets += innerMessageAndOffset.offset } } - }) + } // We use the offset seq to assign offsets so the offset of the messages does not change. new ByteBufferMessageSet( compressionCodec = this.headOption.map(_.message.compressionCodec).getOrElse(NoCompressionCodec), - offsetSeq = offsets.toSeq, + offsetSeq = offsets, newMessages: _*) } /** * Get a shallow iterator over the messages in the set. */ - override def iterator() = iterator(Int.MaxValue) + override def iterator = iterator(Int.MaxValue) /** * Get an iterator over the messages in the set. We only do shallow iteration here. http://git-wip-us.apache.org/repos/asf/kafka/blob/01aeea7c/core/src/main/scala/kafka/log/Log.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index f8c0b77..fd176b1 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -21,17 +21,16 @@ import kafka.utils._ import kafka.message._ import kafka.common._ import kafka.metrics.KafkaMetricsGroup -import kafka.server.{LogOffsetMetadata, FetchDataInfo, BrokerTopicStats} - -import java.io.{IOException, File} +import kafka.server.{BrokerTopicStats, FetchDataInfo, LogOffsetMetadata} +import java.io.{File, IOException} import java.util.concurrent.{ConcurrentNavigableMap, ConcurrentSkipListMap} import java.util.concurrent.atomic._ import java.text.NumberFormat -import org.apache.kafka.common.errors.{OffsetOutOfRangeException, RecordBatchTooLargeException, RecordTooLargeException, CorruptRecordException} + +import org.apache.kafka.common.errors.{CorruptRecordException, OffsetOutOfRangeException, RecordBatchTooLargeException, RecordTooLargeException} import org.apache.kafka.common.record.TimestampType import scala.collection.JavaConversions - import com.yammer.metrics.core.Gauge object LogAppendInfo { @@ -320,7 +319,7 @@ class Log(val dir: File, val appendInfo = analyzeAndValidateMessageSet(messages) // if we have any valid messages, append them to the log - if(appendInfo.shallowCount == 0) + if (appendInfo.shallowCount == 0) return appendInfo // trim any invalid bytes or partial messages before appending it to the on-disk log @@ -333,42 +332,46 @@ class Log(val dir: File, if (assignOffsets) { // assign offsets to the message set - val offset = new AtomicLong(nextOffsetMetadata.messageOffset) - val now = SystemTime.milliseconds - try { - validMessages = validMessages.validateMessagesAndAssignOffsets(offset, - now, - appendInfo.sourceCodec, - appendInfo.targetCodec, - config.compact, - config.messageFormatVersion, - config.messageTimestampType, - config.messageTimestampDifferenceMaxMs) + val offset = new LongRef(nextOffsetMetadata.messageOffset) + val now = time.milliseconds + val (validatedMessages, messageSizesMaybeChanged) = try { + validMessages.validateMessagesAndAssignOffsets(offset, + now, + appendInfo.sourceCodec, + appendInfo.targetCodec, + config.compact, + config.messageFormatVersion.messageFormatVersion, + config.messageTimestampType, + config.messageTimestampDifferenceMaxMs) } catch { case e: IOException => throw new KafkaException("Error in validating messages while appending to log '%s'".format(name), e) } - appendInfo.lastOffset = offset.get - 1 - // If log append time is used, we put the timestamp assigned to the messages in the append info. + validMessages = validatedMessages + appendInfo.lastOffset = offset.value - 1 if (config.messageTimestampType == TimestampType.LOG_APPEND_TIME) appendInfo.timestamp = now + + // re-validate message sizes if there's a possibility that they have changed (due to re-compression or message + // format conversion) + if (messageSizesMaybeChanged) { + for (messageAndOffset <- validMessages.shallowIterator) { + if (MessageSet.entrySize(messageAndOffset.message) > config.maxMessageSize) { + // we record the original message set size instead of the trimmed size + // to be consistent with pre-compression bytesRejectedRate recording + BrokerTopicStats.getBrokerTopicStats(topicAndPartition.topic).bytesRejectedRate.mark(messages.sizeInBytes) + BrokerTopicStats.getBrokerAllTopicsStats.bytesRejectedRate.mark(messages.sizeInBytes) + throw new RecordTooLargeException("Message size is %d bytes which exceeds the maximum configured message size of %d." + .format(MessageSet.entrySize(messageAndOffset.message), config.maxMessageSize)) + } + } + } + } else { // we are taking the offsets we are given if (!appendInfo.offsetsMonotonic || appendInfo.firstOffset < nextOffsetMetadata.messageOffset) throw new IllegalArgumentException("Out of order offsets found in " + messages) } - // re-validate message sizes since after re-compression some may exceed the limit - for (messageAndOffset <- validMessages.shallowIterator) { - if (MessageSet.entrySize(messageAndOffset.message) > config.maxMessageSize) { - // we record the original message set size instead of trimmed size - // to be consistent with pre-compression bytesRejectedRate recording - BrokerTopicStats.getBrokerTopicStats(topicAndPartition.topic).bytesRejectedRate.mark(messages.sizeInBytes) - BrokerTopicStats.getBrokerAllTopicsStats.bytesRejectedRate.mark(messages.sizeInBytes) - throw new RecordTooLargeException("Message size is %d bytes which exceeds the maximum configured message size of %d." - .format(MessageSet.entrySize(messageAndOffset.message), config.maxMessageSize)) - } - } - // check messages set size may be exceed config.segmentSize if (validMessages.sizeInBytes > config.segmentSize) { throw new RecordBatchTooLargeException("Message set size is %d bytes which exceeds the maximum configured segment size of %d." http://git-wip-us.apache.org/repos/asf/kafka/blob/01aeea7c/core/src/main/scala/kafka/log/LogCleaner.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala index a3aff15..a2e1913 100644 --- a/core/src/main/scala/kafka/log/LogCleaner.scala +++ b/core/src/main/scala/kafka/log/LogCleaner.scala @@ -370,7 +370,7 @@ private[log] class Cleaner(val id: Int, val retainDeletes = old.lastModified > deleteHorizonMs info("Cleaning segment %s in log %s (last modified %s) into %s, %s deletes." .format(old.baseOffset, log.name, new Date(old.lastModified), cleaned.baseOffset, if(retainDeletes) "retaining" else "discarding")) - cleanInto(log.topicAndPartition, old, cleaned, map, retainDeletes, log.config.messageFormatVersion) + cleanInto(log.topicAndPartition, old, cleaned, map, retainDeletes, log.config.messageFormatVersion.messageFormatVersion) } // trim excess index @@ -430,28 +430,30 @@ private[log] class Cleaner(val id: Int, } messagesRead += 1 } else { - // We use absolute offset to decide whether retain the message or not. This is handled by + // We use the absolute offset to decide whether to retain the message or not. This is handled by the // deep iterator. val messages = ByteBufferMessageSet.deepIterator(entry) - var numberOfInnerMessages = 0 - var formatConversionNeeded = false - val retainedMessages = messages.filter(messageAndOffset => { + var writeOriginalMessageSet = true + val retainedMessages = new mutable.ArrayBuffer[MessageAndOffset] + messages.foreach { messageAndOffset => messagesRead += 1 - numberOfInnerMessages += 1 - if (messageAndOffset.message.magic != messageFormatVersion) - formatConversionNeeded = true - shouldRetainMessage(source, map, retainDeletes, messageAndOffset) - }).toSeq - - // There is no messages compacted out and no message format conversion, write the original message set back - if (retainedMessages.size == numberOfInnerMessages && !formatConversionNeeded) - ByteBufferMessageSet.writeMessage(writeBuffer, entry.message, entry.offset) - else if (retainedMessages.nonEmpty) { - val convertedRetainedMessages = retainedMessages.map(messageAndOffset => { - new MessageAndOffset(messageAndOffset.message.toFormatVersion(messageFormatVersion), messageAndOffset.offset) - }) - compressMessages(writeBuffer, entry.message.compressionCodec, messageFormatVersion, convertedRetainedMessages) + if (shouldRetainMessage(source, map, retainDeletes, messageAndOffset)) { + retainedMessages += { + if (messageAndOffset.message.magic != messageFormatVersion) { + writeOriginalMessageSet = false + new MessageAndOffset(messageAndOffset.message.toFormatVersion(messageFormatVersion), messageAndOffset.offset) + } + else messageAndOffset + } + } + else writeOriginalMessageSet = false } + + // There are no messages compacted out and no message format conversion, write the original message set back + if (writeOriginalMessageSet) + ByteBufferMessageSet.writeMessage(writeBuffer, entry.message, entry.offset) + else if (retainedMessages.nonEmpty) + compressMessages(writeBuffer, entry.message.compressionCodec, messageFormatVersion, retainedMessages) } } @@ -474,27 +476,27 @@ private[log] class Cleaner(val id: Int, private def compressMessages(buffer: ByteBuffer, compressionCodec: CompressionCodec, messageFormatVersion: Byte, - messages: Seq[MessageAndOffset]) { - val messagesIterable = messages.toIterable.map(_.message) - if (messages.isEmpty) { + messageAndOffsets: Seq[MessageAndOffset]) { + val messages = messageAndOffsets.map(_.message) + if (messageAndOffsets.isEmpty) { MessageSet.Empty.sizeInBytes } else if (compressionCodec == NoCompressionCodec) { - for(messageOffset <- messages) + for (messageOffset <- messageAndOffsets) ByteBufferMessageSet.writeMessage(buffer, messageOffset.message, messageOffset.offset) - MessageSet.messageSetSize(messagesIterable) + MessageSet.messageSetSize(messages) } else { - val magicAndTimestamp = MessageSet.magicAndLargestTimestamp(messages.map(_.message)) - val firstAbsoluteOffset = messages.head.offset + val magicAndTimestamp = MessageSet.magicAndLargestTimestamp(messages) + val firstMessageOffset = messageAndOffsets.head + val firstAbsoluteOffset = firstMessageOffset.offset var offset = -1L - val timestampType = messages.head.message.timestampType - val messageWriter = new MessageWriter(math.min(math.max(MessageSet.messageSetSize(messagesIterable) / 2, 1024), 1 << 16)) + val timestampType = firstMessageOffset.message.timestampType + val messageWriter = new MessageWriter(math.min(math.max(MessageSet.messageSetSize(messages) / 2, 1024), 1 << 16)) messageWriter.write(codec = compressionCodec, timestamp = magicAndTimestamp.timestamp, timestampType = timestampType, magicValue = messageFormatVersion) { outputStream => val output = new DataOutputStream(CompressionFactory(compressionCodec, outputStream)) try { - for (messageOffset <- messages) { + for (messageOffset <- messageAndOffsets) { val message = messageOffset.message offset = messageOffset.offset - // Use inner offset when magic value is greater than 0 if (messageFormatVersion > Message.MagicValue_V0) { // The offset of the messages are absolute offset, compute the inner offset. val innerOffset = messageOffset.offset - firstAbsoluteOffset http://git-wip-us.apache.org/repos/asf/kafka/blob/01aeea7c/core/src/main/scala/kafka/log/LogConfig.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/log/LogConfig.scala b/core/src/main/scala/kafka/log/LogConfig.scala index a8fffbd..a76dce7 100755 --- a/core/src/main/scala/kafka/log/LogConfig.scala +++ b/core/src/main/scala/kafka/log/LogConfig.scala @@ -19,6 +19,8 @@ package kafka.log import java.util.Properties +import scala.collection.JavaConverters._ + import kafka.api.ApiVersion import kafka.message.{BrokerCompressionCodec, Message} import kafka.server.KafkaConfig @@ -73,9 +75,9 @@ case class LogConfig(props: java.util.Map[_, _]) extends AbstractConfig(LogConfi val minInSyncReplicas = getInt(LogConfig.MinInSyncReplicasProp) val compressionType = getString(LogConfig.CompressionTypeProp).toLowerCase val preallocate = getBoolean(LogConfig.PreAllocateEnableProp) - val messageFormatVersion = ApiVersion(getString(LogConfig.MessageFormatVersionProp)).messageFormatVersion + val messageFormatVersion = ApiVersion(getString(LogConfig.MessageFormatVersionProp)) val messageTimestampType = TimestampType.forName(getString(LogConfig.MessageTimestampTypeProp)) - val messageTimestampDifferenceMaxMs = getLong(LogConfig.MessageTimestampDifferenceMaxMsProp) + val messageTimestampDifferenceMaxMs = getLong(LogConfig.MessageTimestampDifferenceMaxMsProp).longValue def randomSegmentJitter: Long = if (segmentJitterMs == 0) 0 else Utils.abs(scala.util.Random.nextInt()) % math.min(segmentJitterMs, segmentMs) @@ -177,12 +179,8 @@ object LogConfig { def apply(): LogConfig = LogConfig(new Properties()) - def configNames() = { - import scala.collection.JavaConversions._ - configDef.names().toList.sorted - } - - + def configNames: Seq[String] = configDef.names.asScala.toSeq.sorted + /** * Create a log config instance using the given properties and defaults */ @@ -197,10 +195,8 @@ object LogConfig { * Check that property names are valid */ def validateNames(props: Properties) { - import scala.collection.JavaConversions._ - val names = configDef.names() - for(name <- props.keys) - require(names.contains(name), "Unknown configuration \"%s\".".format(name)) + val names = configNames + for (name <- props.keys.asScala) require(names.contains(name), s"Unknown configuration `$name`.") } /** http://git-wip-us.apache.org/repos/asf/kafka/blob/01aeea7c/core/src/main/scala/kafka/log/LogManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala index 69386c1..b64fac6 100755 --- a/core/src/main/scala/kafka/log/LogManager.scala +++ b/core/src/main/scala/kafka/log/LogManager.scala @@ -19,11 +19,13 @@ package kafka.log import java.io._ import java.util.concurrent.TimeUnit + import kafka.utils._ + import scala.collection._ -import kafka.common.{TopicAndPartition, KafkaException} -import kafka.server.{RecoveringFromUncleanShutdown, BrokerState, OffsetCheckpoint} -import java.util.concurrent.{Executors, ExecutorService, ExecutionException, Future} +import kafka.common.{KafkaException, TopicAndPartition} +import kafka.server.{BrokerState, OffsetCheckpoint, RecoveringFromUncleanShutdown} +import java.util.concurrent.{ExecutionException, ExecutorService, Executors, Future} /** * The entry point to the kafka log management subsystem. The log manager is responsible for log creation, retrieval, and cleaning. @@ -463,7 +465,7 @@ class LogManager(val logDirs: Array[File], /** * Get a map of TopicAndPartition => Log */ - def logsByTopicPartition = logs.toMap + def logsByTopicPartition: Map[TopicAndPartition, Log] = logs.toMap /** * Map of log dir to logs by topic and partitions in that dir http://git-wip-us.apache.org/repos/asf/kafka/blob/01aeea7c/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala b/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala index 2867c78..856f971 100644 --- a/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala +++ b/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala @@ -18,33 +18,33 @@ package kafka.message import kafka.utils.{IteratorTemplate, Logging} -import kafka.common.KafkaException - +import kafka.common.{KafkaException, LongRef} import java.nio.ByteBuffer import java.nio.channels._ import java.io._ -import java.util.concurrent.atomic.{AtomicInteger, AtomicLong} +import java.util.ArrayDeque +import java.util.concurrent.atomic.AtomicLong + +import scala.collection.JavaConverters._ import org.apache.kafka.common.errors.InvalidTimestampException import org.apache.kafka.common.record.TimestampType import org.apache.kafka.common.utils.Utils import scala.collection.mutable -import scala.collection.mutable.ListBuffer object ByteBufferMessageSet { - private def create(offsetAssignor: OffsetAssigner, + private def create(offsetAssigner: OffsetAssigner, compressionCodec: CompressionCodec, wrapperMessageTimestamp: Option[Long], timestampType: TimestampType, messages: Message*): ByteBuffer = { - if(messages.size == 0) { + if (messages.isEmpty) MessageSet.Empty.buffer - } else if(compressionCodec == NoCompressionCodec) { + else if (compressionCodec == NoCompressionCodec) { val buffer = ByteBuffer.allocate(MessageSet.messageSetSize(messages)) - for(message <- messages) - writeMessage(buffer, message, offsetAssignor.nextAbsoluteOffset) + for (message <- messages) writeMessage(buffer, message, offsetAssigner.nextAbsoluteOffset()) buffer.rewind() buffer } else { @@ -58,12 +58,12 @@ object ByteBufferMessageSet { val output = new DataOutputStream(CompressionFactory(compressionCodec, outputStream)) try { for (message <- messages) { - offset = offsetAssignor.nextAbsoluteOffset + offset = offsetAssigner.nextAbsoluteOffset() if (message.magic != magicAndTimestamp.magic) throw new IllegalArgumentException("Messages in the message set must have same magic value") // Use inner offset if magic value is greater than 0 if (magicAndTimestamp.magic > Message.MagicValue_V0) - output.writeLong(offsetAssignor.toInnerOffset(offset)) + output.writeLong(offsetAssigner.toInnerOffset(offset)) else output.writeLong(offset) output.writeInt(message.size) @@ -87,24 +87,22 @@ object ByteBufferMessageSet { new IteratorTemplate[MessageAndOffset] { - val wrapperMessageOffset = wrapperMessageAndOffset.offset - val wrapperMessage = wrapperMessageAndOffset.message + val MessageAndOffset(wrapperMessage, wrapperMessageOffset) = wrapperMessageAndOffset val wrapperMessageTimestampOpt: Option[Long] = if (wrapperMessage.magic > MagicValue_V0) Some(wrapperMessage.timestamp) else None val wrapperMessageTimestampTypeOpt: Option[TimestampType] = if (wrapperMessage.magic > MagicValue_V0) Some(wrapperMessage.timestampType) else None if (wrapperMessage.payload == null) - throw new KafkaException("Message payload is null: " + wrapperMessage) - val inputStream: InputStream = new ByteBufferBackedInputStream(wrapperMessage.payload) - val compressed: DataInputStream = new DataInputStream(CompressionFactory(wrapperMessage.compressionCodec, inputStream)) + throw new KafkaException(s"Message payload is null: $wrapperMessage") + val inputStream = new ByteBufferBackedInputStream(wrapperMessage.payload) + val compressed = new DataInputStream(CompressionFactory(wrapperMessage.compressionCodec, inputStream)) var lastInnerOffset = -1L val messageAndOffsets = if (wrapperMessageAndOffset.message.magic > MagicValue_V0) { - var innerMessageAndOffsets = new mutable.Queue[MessageAndOffset]() + val innerMessageAndOffsets = new ArrayDeque[MessageAndOffset]() try { - while (true) { - innerMessageAndOffsets += readMessageFromStream() - } + while (true) + innerMessageAndOffsets.add(readMessageFromStream()) } catch { case eofe: EOFException => compressed.close() @@ -112,23 +110,18 @@ object ByteBufferMessageSet { throw new KafkaException(ioe) } Some(innerMessageAndOffsets) - } else { - None - } + } else None - private def readMessageFromStream() = { - // read the offset + private def readMessageFromStream(): MessageAndOffset = { val innerOffset = compressed.readLong() - // read record size - val size = compressed.readInt() + val recordSize = compressed.readInt() - if (size < MinMessageOverhead) - throw new InvalidMessageException("Message found with corrupt size (" + size + ") in deep iterator") + if (recordSize < MinMessageOverhead) + throw new InvalidMessageException(s"Message found with corrupt size `$recordSize` in deep iterator") - // read the record into an intermediate record buffer - // and hence has to do extra copy - val bufferArray = new Array[Byte](size) - compressed.readFully(bufferArray, 0, size) + // read the record into an intermediate record buffer (i.e. extra copy needed) + val bufferArray = new Array[Byte](recordSize) + compressed.readFully(bufferArray, 0, recordSize) val buffer = ByteBuffer.wrap(bufferArray) // Override the timestamp if necessary @@ -146,20 +139,17 @@ object ByteBufferMessageSet { messageAndOffsets match { // Using inner offset and timestamps case Some(innerMessageAndOffsets) => - if (innerMessageAndOffsets.isEmpty) - allDone() - else { - val messageAndOffset = innerMessageAndOffsets.dequeue() - val message = messageAndOffset.message - val relativeOffset = messageAndOffset.offset - lastInnerOffset - val absoluteOffset = wrapperMessageOffset + relativeOffset - new MessageAndOffset(message, absoluteOffset) + innerMessageAndOffsets.pollFirst() match { + case null => allDone() + case MessageAndOffset(message, offset) => + val relativeOffset = offset - lastInnerOffset + val absoluteOffset = wrapperMessageOffset + relativeOffset + new MessageAndOffset(message, absoluteOffset) } // Not using inner offset and timestamps case None => - try { - readMessageFromStream() - } catch { + try readMessageFromStream() + catch { case eofe: EOFException => compressed.close() allDone() @@ -185,17 +175,23 @@ object ByteBufferMessageSet { } } +private object OffsetAssigner { + + def apply(offsetCounter: LongRef, size: Int): OffsetAssigner = + new OffsetAssigner(offsetCounter.value to offsetCounter.addAndGet(size)) + +} + private class OffsetAssigner(offsets: Seq[Long]) { - val index = new AtomicInteger(0) + private var index = 0 - def this(offsetCounter: AtomicLong, size: Int) { - this((offsetCounter.get() to offsetCounter.get + size).toSeq) - offsetCounter.addAndGet(size) + def nextAbsoluteOffset(): Long = { + val result = offsets(index) + index += 1 + result } - def nextAbsoluteOffset = offsets(index.getAndIncrement) - - def toInnerOffset(offset: Long) = offset - offsets(0) + def toInnerOffset(offset: Long): Long = offset - offsets.head } @@ -209,17 +205,17 @@ private class OffsetAssigner(offsets: Seq[Long]) { * Option 2: Give it a list of messages along with instructions relating to serialization format. Producers will use this method. * * - * When message format v1 is used, there will be following message format changes. - * - For non-compressed messages, with message v1 we are adding timestamp and timestamp type attribute. The offsets of + * Message format v1 has the following changes: + * - For non-compressed messages, timestamp and timestamp type attributes have been added. The offsets of * the messages remain absolute offsets. - * - For Compressed messages, with message v1 we are adding timestamp, timestamp type attribute bit and using - * inner offsets (IO) for inner messages of compressed messages (see offset calculation details below). Timestamp type - * attribute is only set in wrapper messages. Inner messages always have CreateTime as timestamp type in attributes. + * - For compressed messages, timestamp and timestamp type attributes have been added and inner offsets (IO) are used + * for inner messages of compressed messages (see offset calculation details below). The timestamp type + * attribute is only set in wrapper messages. Inner messages always have CreateTime as the timestamp type in attributes. * - * The way timestamp set is following: - * For non-compressed messages: timestamp and timestamp type attribute in the messages are set and used. + * We set the timestamp in the following way: + * For non-compressed messages: the timestamp and timestamp type message attributes are set and used. * For compressed messages: - * 1. Wrapper messages' timestamp type attribute is set to proper value + * 1. Wrapper messages' timestamp type attribute is set to the proper value * 2. Wrapper messages' timestamp is set to: * - the max timestamp of inner messages if CreateTime is used * - the current server time if wrapper message's timestamp = LogAppendTime. @@ -227,11 +223,10 @@ private class OffsetAssigner(offsets: Seq[Long]) { * 3. Inner messages' timestamp will be: * - used when wrapper message's timestamp type is CreateTime * - ignored when wrapper message's timestamp type is LogAppendTime - * 4. Inner messages' timestamp type will always be ignored. However, producer must set the inner message timestamp - * type to CreateTime, otherwise the messages will be rejected by broker. + * 4. Inner messages' timestamp type will always be ignored with one exception: producers must set the inner message + * timestamp type to CreateTime, otherwise the messages will be rejected by broker. * - * - * The way absolute offset calculated is the following: + * Absolute offsets are calculated in the following way: * Ideally the conversion from relative offset(RO) to absolute offset(AO) should be: * * AO = AO_Of_Last_Inner_Message + RO @@ -240,7 +235,7 @@ private class OffsetAssigner(offsets: Seq[Long]) { * And the relative offset of an inner message compared with the last inner message is not known until * the last inner message is written. * Unfortunately we are not able to change the previously written messages after the last message is written to - * the message set when stream compressing is used. + * the message set when stream compression is used. * * To solve this issue, we use the following solution: * @@ -254,20 +249,23 @@ private class OffsetAssigner(offsets: Seq[Long]) { * RO = IO_of_a_message - IO_of_the_last_message * AO = AO_Of_Last_Inner_Message + RO * - * 4. This solution works for compacted message set as well + * 4. This solution works for compacted message sets as well. * */ class ByteBufferMessageSet(val buffer: ByteBuffer) extends MessageSet with Logging { private var shallowValidByteCount = -1 - def this(compressionCodec: CompressionCodec, messages: Message*) { - this(ByteBufferMessageSet.create(new OffsetAssigner(new AtomicLong(0), messages.size), compressionCodec, - None, TimestampType.CREATE_TIME, messages:_*)) + private[kafka] def this(compressionCodec: CompressionCodec, + offsetCounter: LongRef, + wrapperMessageTimestamp: Option[Long], + timestampType: TimestampType, + messages: Message*) { + this(ByteBufferMessageSet.create(OffsetAssigner(offsetCounter, messages.size), compressionCodec, + wrapperMessageTimestamp, timestampType, messages:_*)) } - def this(compressionCodec: CompressionCodec, offsetCounter: AtomicLong, messages: Message*) { - this(ByteBufferMessageSet.create(new OffsetAssigner(offsetCounter, messages.size), compressionCodec, - None, TimestampType.CREATE_TIME, messages:_*)) + def this(compressionCodec: CompressionCodec, offsetCounter: LongRef, messages: Message*) { + this(compressionCodec, offsetCounter, None, TimestampType.CREATE_TIME, messages:_*) } def this(compressionCodec: CompressionCodec, offsetSeq: Seq[Long], messages: Message*) { @@ -275,31 +273,21 @@ class ByteBufferMessageSet(val buffer: ByteBuffer) extends MessageSet with Loggi None, TimestampType.CREATE_TIME, messages:_*)) } - def this(messages: Message*) { - this(NoCompressionCodec, new AtomicLong(0), messages: _*) + def this(compressionCodec: CompressionCodec, messages: Message*) { + this(compressionCodec, new LongRef(0L), messages: _*) } - // This constructor is only used internally - private[kafka] def this(compressionCodec: CompressionCodec, - offsetCounter: AtomicLong, - wrapperMessageTimestamp: Option[Long], - timestampType: TimestampType, - messages: Message*) { - this(ByteBufferMessageSet.create(new OffsetAssigner(offsetCounter, messages.size), compressionCodec, - wrapperMessageTimestamp, timestampType, messages:_*)) + def this(messages: Message*) { + this(NoCompressionCodec, messages: _*) } def getBuffer = buffer private def shallowValidBytes: Int = { - if(shallowValidByteCount < 0) { - var bytes = 0 - val iter = this.internalIterator(true) - while(iter.hasNext) { - val messageAndOffset = iter.next - bytes += MessageSet.entrySize(messageAndOffset.message) - } - this.shallowValidByteCount = bytes + if (shallowValidByteCount < 0) { + this.shallowValidByteCount = this.internalIterator(isShallow = true).map { messageAndOffset => + MessageSet.entrySize(messageAndOffset.message) + }.sum } shallowValidByteCount } @@ -315,7 +303,7 @@ class ByteBufferMessageSet(val buffer: ByteBuffer) extends MessageSet with Loggi written } - override def magicValueInAllWrapperMessages(expectedMagicValue: Byte): Boolean = { + override def isMagicValueInAllWrapperMessages(expectedMagicValue: Byte): Boolean = { for (messageAndOffset <- shallowIterator) { if (messageAndOffset.message.magic != expectedMagicValue) return false @@ -398,27 +386,28 @@ class ByteBufferMessageSet(val buffer: ByteBuffer) extends MessageSet with Loggi * * If no format conversion or value overwriting is required for messages, this method will perform in-place * operations and avoid re-compression. + * + * Returns the message set and a boolean indicating whether the message sizes may have changed. */ - private[kafka] def validateMessagesAndAssignOffsets(offsetCounter: AtomicLong, - now: Long = System.currentTimeMillis(), + private[kafka] def validateMessagesAndAssignOffsets(offsetCounter: LongRef, + now: Long, sourceCodec: CompressionCodec, targetCodec: CompressionCodec, compactedTopic: Boolean = false, messageFormatVersion: Byte = Message.CurrentMagicValue, messageTimestampType: TimestampType, - messageTimestampDiffMaxMs: Long): ByteBufferMessageSet = { + messageTimestampDiffMaxMs: Long): (ByteBufferMessageSet, Boolean) = { if (sourceCodec == NoCompressionCodec && targetCodec == NoCompressionCodec) { // check the magic value - if (!magicValueInAllWrapperMessages(messageFormatVersion)) { + if (!isMagicValueInAllWrapperMessages(messageFormatVersion)) { // Message format conversion - convertNonCompressedMessages(offsetCounter, compactedTopic, now, messageTimestampType, messageTimestampDiffMaxMs, - messageFormatVersion) + (convertNonCompressedMessages(offsetCounter, compactedTopic, now, messageTimestampType, messageTimestampDiffMaxMs, + messageFormatVersion), true) } else { // Do in-place validation, offset assignment and maybe set timestamp - validateNonCompressedMessagesAndAssignOffsetInPlace(offsetCounter, now, compactedTopic, messageTimestampType, - messageTimestampDiffMaxMs) + (validateNonCompressedMessagesAndAssignOffsetInPlace(offsetCounter, now, compactedTopic, messageTimestampType, + messageTimestampDiffMaxMs), false) } - } else { // Deal with compressed messages // We cannot do in place assignment in one of the following situations: @@ -432,8 +421,8 @@ class ByteBufferMessageSet(val buffer: ByteBuffer) extends MessageSet with Loggi var maxTimestamp = Message.NoTimestamp val expectedInnerOffset = new AtomicLong(0) - val validatedMessages = new ListBuffer[Message] - this.internalIterator(isShallow = false).foreach(messageAndOffset => { + val validatedMessages = new mutable.ArrayBuffer[Message] + this.internalIterator(isShallow = false).foreach { messageAndOffset => val message = messageAndOffset.message validateMessageKey(message, compactedTopic) if (message.magic > Message.MagicValue_V0 && messageFormatVersion > Message.MagicValue_V0) { @@ -441,7 +430,7 @@ class ByteBufferMessageSet(val buffer: ByteBuffer) extends MessageSet with Loggi // Validate the timestamp validateTimestamp(message, now, messageTimestampType, messageTimestampDiffMaxMs) // Check if we need to overwrite offset - if (messageAndOffset.offset != expectedInnerOffset.getAndIncrement) + if (messageAndOffset.offset != expectedInnerOffset.getAndIncrement()) inPlaceAssignment = false maxTimestamp = math.max(maxTimestamp, message.timestamp) } @@ -451,7 +440,7 @@ class ByteBufferMessageSet(val buffer: ByteBuffer) extends MessageSet with Loggi inPlaceAssignment = false validatedMessages += message.toFormatVersion(messageFormatVersion) - }) + } if (!inPlaceAssignment) { // Cannot do in place assignment. val wrapperMessageTimestamp = { @@ -463,11 +452,11 @@ class ByteBufferMessageSet(val buffer: ByteBuffer) extends MessageSet with Loggi Some(now) } - new ByteBufferMessageSet(compressionCodec = targetCodec, - offsetCounter = offsetCounter, - wrapperMessageTimestamp = wrapperMessageTimestamp, - timestampType = messageTimestampType, - messages = validatedMessages.toBuffer: _*) + (new ByteBufferMessageSet(compressionCodec = targetCodec, + offsetCounter = offsetCounter, + wrapperMessageTimestamp = wrapperMessageTimestamp, + timestampType = messageTimestampType, + messages = validatedMessages: _*), true) } else { // Do not do re-compression but simply update the offset, timestamp and attributes field of the wrapper message. buffer.putLong(0, offsetCounter.addAndGet(validatedMessages.size) - 1) @@ -480,53 +469,49 @@ class ByteBufferMessageSet(val buffer: ByteBuffer) extends MessageSet with Loggi val timestamp = buffer.getLong(timestampOffset) val attributes = buffer.get(attributeOffset) if (messageTimestampType == TimestampType.CREATE_TIME && timestamp == maxTimestamp) - // We don't need to recompute crc if the timestamp is not updated. - crcUpdateNeeded = false + // We don't need to recompute crc if the timestamp is not updated. + crcUpdateNeeded = false else if (messageTimestampType == TimestampType.LOG_APPEND_TIME) { // Set timestamp type and timestamp buffer.putLong(timestampOffset, now) - buffer.put(attributeOffset, TimestampType.setTimestampType(attributes, TimestampType.LOG_APPEND_TIME)) + buffer.put(attributeOffset, messageTimestampType.updateAttributes(attributes)) } if (crcUpdateNeeded) { // need to recompute the crc value buffer.position(MessageSet.LogOverhead) val wrapperMessage = new Message(buffer.slice()) - Utils.writeUnsignedInt(buffer, MessageSet.LogOverhead + Message.CrcOffset, wrapperMessage.computeChecksum()) + Utils.writeUnsignedInt(buffer, MessageSet.LogOverhead + Message.CrcOffset, wrapperMessage.computeChecksum) } buffer.rewind() - this + (this, false) } } } - // We create this method to save memory copy operation. It reads from the original message set and directly + // We create this method to avoid a memory copy. It reads from the original message set and directly // writes the converted messages into new message set buffer. Hence we don't need to allocate memory for each // individual message during message format conversion. - private def convertNonCompressedMessages(offsetCounter: AtomicLong, + private def convertNonCompressedMessages(offsetCounter: LongRef, compactedTopic: Boolean, now: Long, timestampType: TimestampType, messageTimestampDiffMaxMs: Long, toMagicValue: Byte): ByteBufferMessageSet = { - val sizeInBytesAfterConversion = shallowValidBytes + this.internalIterator(isShallow = true).foldLeft(0)( - (sizeDiff, messageAndOffset) => sizeDiff + Message.headerSizeDiff(messageAndOffset.message.magic, toMagicValue)) + val sizeInBytesAfterConversion = shallowValidBytes + this.internalIterator(isShallow = true).map { messageAndOffset => + Message.headerSizeDiff(messageAndOffset.message.magic, toMagicValue) + }.sum val newBuffer = ByteBuffer.allocate(sizeInBytesAfterConversion) var newMessagePosition = 0 - this.internalIterator(isShallow = true).foreach {messageAndOffset => - val message = messageAndOffset.message + this.internalIterator(isShallow = true).foreach { case MessageAndOffset(message, _) => validateMessageKey(message, compactedTopic) validateTimestamp(message, now, timestampType, messageTimestampDiffMaxMs) newBuffer.position(newMessagePosition) - // write offset. - newBuffer.putLong(offsetCounter.getAndIncrement) - // Write new message size + newBuffer.putLong(offsetCounter.getAndIncrement()) val newMessageSize = message.size + Message.headerSizeDiff(message.magic, toMagicValue) newBuffer.putInt(newMessageSize) - // Create new message buffer val newMessageBuffer = newBuffer.slice() newMessageBuffer.limit(newMessageSize) - // Convert message message.convertToBuffer(toMagicValue, newMessageBuffer, now, timestampType) newMessagePosition += MessageSet.LogOverhead + newMessageSize @@ -535,7 +520,7 @@ class ByteBufferMessageSet(val buffer: ByteBuffer) extends MessageSet with Loggi new ByteBufferMessageSet(newBuffer) } - private def validateNonCompressedMessagesAndAssignOffsetInPlace(offsetCounter: AtomicLong, + private def validateNonCompressedMessagesAndAssignOffsetInPlace(offsetCounter: LongRef, now: Long, compactedTopic: Boolean, timestampType: TimestampType, @@ -555,8 +540,8 @@ class ByteBufferMessageSet(val buffer: ByteBuffer) extends MessageSet with Loggi validateTimestamp(message, now, timestampType, timestampDiffMaxMs) if (timestampType == TimestampType.LOG_APPEND_TIME) { message.buffer.putLong(Message.TimestampOffset, now) - message.buffer.put(Message.AttributesOffset, TimestampType.setTimestampType(message.attributes, TimestampType.LOG_APPEND_TIME)) - Utils.writeUnsignedInt(message.buffer, Message.CrcOffset, message.computeChecksum()) + message.buffer.put(Message.AttributesOffset, timestampType.updateAttributes(message.attributes)) + Utils.writeUnsignedInt(message.buffer, Message.CrcOffset, message.computeChecksum) } } messagePosition += MessageSet.LogOverhead + messageSize http://git-wip-us.apache.org/repos/asf/kafka/blob/01aeea7c/core/src/main/scala/kafka/message/Message.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/message/Message.scala b/core/src/main/scala/kafka/message/Message.scala index 51aa11a..2ab2e0c 100755 --- a/core/src/main/scala/kafka/message/Message.scala +++ b/core/src/main/scala/kafka/message/Message.scala @@ -50,8 +50,8 @@ object Message { val ValueSizeLength = 4 private val MessageHeaderSizeMap = Map ( - 0.asInstanceOf[Byte] -> (CrcLength + MagicLength + AttributesLength + KeySizeLength + ValueSizeLength), - 1.asInstanceOf[Byte] -> (CrcLength + MagicLength + AttributesLength + TimestampLength + KeySizeLength + ValueSizeLength)) + (0: Byte) -> (CrcLength + MagicLength + AttributesLength + KeySizeLength + ValueSizeLength), + (1: Byte) -> (CrcLength + MagicLength + AttributesLength + TimestampLength + KeySizeLength + ValueSizeLength)) /** * The amount of overhead bytes in a message @@ -123,10 +123,10 @@ object Message { * * Default constructor wraps an existing ByteBuffer with the Message object with no change to the contents. * @param buffer the byte buffer of this message. - * @param wrapperMessageTimestamp the wrapper message timestamp, only not None when the message is an inner message - * of a compressed message. - * @param wrapperMessageTimestampType the wrapper message timestamp type, only not None when the message is an inner - * message of a compressed message. + * @param wrapperMessageTimestamp the wrapper message timestamp, which is only defined when the message is an inner + * message of a compressed message. + * @param wrapperMessageTimestampType the wrapper message timestamp type, which is only defined when the message is an + * inner message of a compressed message. */ class Message(val buffer: ByteBuffer, private val wrapperMessageTimestamp: Option[Long] = None, @@ -168,11 +168,10 @@ class Message(val buffer: ByteBuffer, // skip crc, we will fill that in at the end buffer.position(MagicOffset) buffer.put(magicValue) - var attributes: Byte = 0 - if (codec.codec > 0) { - attributes = (attributes | (CompressionCodeMask & codec.codec)).toByte - attributes = TimestampType.setTimestampType(attributes, timestampType) - } + val attributes: Byte = + if (codec.codec > 0) + timestampType.updateAttributes((CompressionCodeMask & codec.codec).toByte) + else 0 buffer.put(attributes) // Only put timestamp when "magic" value is greater than 0 if (magic > MagicValue_V0) @@ -213,7 +212,7 @@ class Message(val buffer: ByteBuffer, /** * Compute the checksum of the message from the message contents */ - def computeChecksum(): Long = + def computeChecksum: Long = CoreUtils.crc32(buffer.array, buffer.arrayOffset + MagicOffset, buffer.limit - MagicOffset) /** @@ -231,7 +230,7 @@ class Message(val buffer: ByteBuffer, */ def ensureValid() { if(!isValid) - throw new InvalidMessageException("Message is corrupt (stored crc = " + checksum + ", computed crc = " + computeChecksum() + ")") + throw new InvalidMessageException(s"Message is corrupt (stored crc = ${checksum}, computed crc = ${computeChecksum})") } /** @@ -242,7 +241,7 @@ class Message(val buffer: ByteBuffer, /** * The position where the key size is stored. */ - def keySizeOffset = { + private def keySizeOffset = { if (magic == MagicValue_V0) KeySizeOffset_V0 else KeySizeOffset_V1 } @@ -260,7 +259,7 @@ class Message(val buffer: ByteBuffer, /** * The position where the payload size is stored */ - def payloadSizeOffset = { + private def payloadSizeOffset = { if (magic == MagicValue_V0) KeyOffset_V0 + max(0, keySize) else KeyOffset_V1 + max(0, keySize) } @@ -273,7 +272,7 @@ class Message(val buffer: ByteBuffer, /** * Is the payload of this message null */ - def isNull(): Boolean = payloadSize < 0 + def isNull: Boolean = payloadSize < 0 /** * The magic version of this message @@ -309,7 +308,7 @@ class Message(val buffer: ByteBuffer, if (magic == MagicValue_V0) TimestampType.NO_TIMESTAMP_TYPE else - wrapperMessageTimestampType.getOrElse(TimestampType.getTimestampType(attributes)) + wrapperMessageTimestampType.getOrElse(TimestampType.forAttributes(attributes)) } /** @@ -337,23 +336,23 @@ class Message(val buffer: ByteBuffer, else { val byteBuffer = ByteBuffer.allocate(size + Message.headerSizeDiff(magic, toMagicValue)) // Copy bytes from old messages to new message - convertToBuffer(toMagicValue, byteBuffer) + convertToBuffer(toMagicValue, byteBuffer, Message.NoTimestamp) new Message(byteBuffer) } } def convertToBuffer(toMagicValue: Byte, byteBuffer: ByteBuffer, - now: Long = NoTimestamp, - timestampType: TimestampType = wrapperMessageTimestampType.getOrElse(TimestampType.getTimestampType(attributes))) { + now: Long, + timestampType: TimestampType = wrapperMessageTimestampType.getOrElse(TimestampType.forAttributes(attributes))) { if (byteBuffer.remaining() < size + headerSizeDiff(magic, toMagicValue)) throw new IndexOutOfBoundsException("The byte buffer does not have enough capacity to hold new message format " + - "version " + toMagicValue) + s"version $toMagicValue") if (toMagicValue == Message.MagicValue_V1) { // Up-conversion, reserve CRC and update magic byte byteBuffer.position(Message.MagicOffset) byteBuffer.put(Message.MagicValue_V1) - byteBuffer.put(TimestampType.setTimestampType(attributes, timestampType)) + byteBuffer.put(timestampType.updateAttributes(attributes)) // Up-conversion, insert the timestamp field if (timestampType == TimestampType.LOG_APPEND_TIME) byteBuffer.putLong(now) @@ -364,13 +363,13 @@ class Message(val buffer: ByteBuffer, // Down-conversion, reserve CRC and update magic byte byteBuffer.position(Message.MagicOffset) byteBuffer.put(Message.MagicValue_V0) - byteBuffer.put(TimestampType.setTimestampType(attributes, TimestampType.CREATE_TIME)) + byteBuffer.put(TimestampType.CREATE_TIME.updateAttributes(attributes)) // Down-conversion, skip the timestamp field byteBuffer.put(buffer.array(), buffer.arrayOffset() + Message.KeySizeOffset_V1, size - Message.KeySizeOffset_V1) } // update crc value val newMessage = new Message(byteBuffer) - Utils.writeUnsignedInt(byteBuffer, Message.CrcOffset, newMessage.computeChecksum()) + Utils.writeUnsignedInt(byteBuffer, Message.CrcOffset, newMessage.computeChecksum) byteBuffer.rewind() } @@ -382,7 +381,7 @@ class Message(val buffer: ByteBuffer, if(size < 0) { null } else { - var b = buffer.duplicate + var b = buffer.duplicate() b.position(start + 4) b = b.slice() b.limit(size) @@ -396,9 +395,9 @@ class Message(val buffer: ByteBuffer, */ private def validateTimestampAndMagicValue(timestamp: Long, magic: Byte) { if (magic != MagicValue_V0 && magic != MagicValue_V1) - throw new IllegalArgumentException("Invalid magic value " + magic) + throw new IllegalArgumentException(s"Invalid magic value $magic") if (timestamp < 0 && timestamp != NoTimestamp) - throw new IllegalArgumentException("Invalid message timestamp " + timestamp) + throw new IllegalArgumentException(s"Invalid message timestamp $timestamp") if (magic == MagicValue_V0 && timestamp != NoTimestamp) throw new IllegalArgumentException(s"Invalid timestamp $timestamp. Timestamp must be ${NoTimestamp} when magic = ${MagicValue_V0}") } http://git-wip-us.apache.org/repos/asf/kafka/blob/01aeea7c/core/src/main/scala/kafka/message/MessageSet.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/message/MessageSet.scala b/core/src/main/scala/kafka/message/MessageSet.scala index 014788a..14c455c 100644 --- a/core/src/main/scala/kafka/message/MessageSet.scala +++ b/core/src/main/scala/kafka/message/MessageSet.scala @@ -37,30 +37,16 @@ object MessageSet { messages.foldLeft(0)(_ + entrySize(_)) /** - * The size of a list of messages - */ - def messageSetSize(messages: java.util.List[Message]): Int = { - var size = 0 - val iter = messages.iterator - while(iter.hasNext) { - val message = iter.next - size += entrySize(message) - } - size - } - - /** * The size of a size-delimited entry in a message set */ def entrySize(message: Message): Int = LogOverhead + message.size /** - * Validate the "magic" values of messages are the same in a compressed message set and return the magic value of - * and the max timestamp of the inner messages. + * Validate that all "magic" values in `messages` are the same and return their magic value and max timestamp */ def magicAndLargestTimestamp(messages: Seq[Message]): MagicAndTimestamp = { val firstMagicValue = messages.head.magic - var largestTimestamp: Long = Message.NoTimestamp + var largestTimestamp = Message.NoTimestamp for (message <- messages) { if (message.magic != firstMagicValue) throw new IllegalStateException("Messages in the same message set must have same magic value") @@ -92,7 +78,7 @@ abstract class MessageSet extends Iterable[MessageAndOffset] { /** * Check if all the wrapper messages in the message set have the expected magic value */ - def magicValueInAllWrapperMessages(expectedMagicValue: Byte): Boolean + def isMagicValueInAllWrapperMessages(expectedMagicValue: Byte): Boolean /** * Provides an iterator over the message/offset pairs in this set http://git-wip-us.apache.org/repos/asf/kafka/blob/01aeea7c/core/src/main/scala/kafka/message/MessageWriter.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/message/MessageWriter.scala b/core/src/main/scala/kafka/message/MessageWriter.scala index 660772c..e6954ff 100755 --- a/core/src/main/scala/kafka/message/MessageWriter.scala +++ b/core/src/main/scala/kafka/message/MessageWriter.scala @@ -40,7 +40,7 @@ class MessageWriter(segmentSize: Int) extends BufferingOutputStream(segmentSize) if (codec.codec > 0) attributes = (attributes | (CompressionCodeMask & codec.codec)).toByte if (magicValue > MagicValue_V0) - attributes = TimestampType.setTimestampType(attributes, timestampType) + attributes = timestampType.updateAttributes(attributes) write(attributes) // Write timestamp if (magicValue > MagicValue_V0) http://git-wip-us.apache.org/repos/asf/kafka/blob/01aeea7c/core/src/main/scala/kafka/server/ConfigHandler.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/ConfigHandler.scala b/core/src/main/scala/kafka/server/ConfigHandler.scala index 9343fde..4bdd308 100644 --- a/core/src/main/scala/kafka/server/ConfigHandler.scala +++ b/core/src/main/scala/kafka/server/ConfigHandler.scala @@ -20,14 +20,13 @@ package kafka.server import java.util.Properties import kafka.api.ApiVersion -import kafka.common.TopicAndPartition -import kafka.log.{Log, LogConfig, LogManager} +import kafka.log.{LogConfig, LogManager} import kafka.utils.Logging import org.apache.kafka.common.metrics.Quota import org.apache.kafka.common.protocol.ApiKeys -import scala.collection.mutable import scala.collection.Map +import scala.collection.JavaConverters._ /** * The ConfigHandler is used to process config change notifications received by the DynamicConfigManager @@ -42,29 +41,27 @@ trait ConfigHandler { */ class TopicConfigHandler(private val logManager: LogManager, kafkaConfig: KafkaConfig) extends ConfigHandler with Logging { - def processConfigChanges(topic : String, topicConfig : Properties) { - val logs: mutable.Buffer[(TopicAndPartition, Log)] = logManager.logsByTopicPartition.toBuffer - val logsByTopic: Map[String, mutable.Buffer[Log]] = logs.groupBy{ case (topicAndPartition, log) => topicAndPartition.topic } - .mapValues{ case v: mutable.Buffer[(TopicAndPartition, Log)] => v.map(_._2) } + def processConfigChanges(topic: String, topicConfig: Properties) { // Validate the compatibility of message format version. - Option(topicConfig.getProperty(LogConfig.MessageFormatVersionProp)) match { - case Some(versionString) => - if (!kafkaConfig.interBrokerProtocolVersion.onOrAfter(ApiVersion(versionString))) { - topicConfig.remove(LogConfig.MessageFormatVersionProp) - warn(s"Log configuration ${LogConfig.MessageFormatVersionProp} is ignored for $topic because $versionString " + - s"is not compatible with Kafka inter broker protocol version ${kafkaConfig.interBrokerProtocolVersion}") - } - case _ => + val configNameToExclude = Option(topicConfig.getProperty(LogConfig.MessageFormatVersionProp)).flatMap { versionString => + if (kafkaConfig.interBrokerProtocolVersion < ApiVersion(versionString)) { + warn(s"Log configuration ${LogConfig.MessageFormatVersionProp} is ignored for `$topic` because `$versionString` " + + s"is not compatible with Kafka inter-broker protocol version `${kafkaConfig.interBrokerProtocolVersionString}`") + Some(LogConfig.MessageFormatVersionProp) + } + else None } - if (logsByTopic.contains(topic)) { + val logs = logManager.logsByTopicPartition.filterKeys(_.topic == topic).values.toBuffer + if (logs.nonEmpty) { /* combine the default properties with the overrides in zk to create the new LogConfig */ val props = new Properties() props.putAll(logManager.defaultConfig.originals) - props.putAll(topicConfig) + topicConfig.asScala.foreach { case (key, value) => + if (key != configNameToExclude) props.put(key, value) + } val logConfig = LogConfig(props) - for (log <- logsByTopic(topic)) - log.config = logConfig + logs.foreach(_.config = logConfig) } } } http://git-wip-us.apache.org/repos/asf/kafka/blob/01aeea7c/core/src/main/scala/kafka/server/KafkaApis.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index bd02630..2a289b4 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -368,12 +368,10 @@ class KafkaApis(val requestChannel: RequestChannel, val respHeader = new ResponseHeader(request.header.correlationId) val respBody = request.header.apiVersion match { case 0 => new ProduceResponse(mergedResponseStatus.asJava) - case 1 => new ProduceResponse(mergedResponseStatus.asJava, delayTimeMs, 1) - case 2 => new ProduceResponse(mergedResponseStatus.asJava, delayTimeMs, 2) + case version@ (1 | 2) => new ProduceResponse(mergedResponseStatus.asJava, delayTimeMs, version) // This case shouldn't happen unless a new version of ProducerRequest is added without // updating this part of the code to handle it properly. - case _ => throw new IllegalArgumentException("Version %d of ProducerRequest is not handled. Code must be updated." - .format(request.header.apiVersion)) + case version => throw new IllegalArgumentException(s"Version `$version` of ProduceRequest is not handled. Code must be updated.") } requestChannel.sendResponse(new RequestChannel.Response(request, new ResponseSend(request.connectionId, respHeader, respBody))) @@ -424,52 +422,51 @@ class KafkaApis(val requestChannel: RequestChannel, case (topicAndPartition, _) => authorize(request.session, Read, new Resource(Topic, topicAndPartition.topic)) } - val unauthorizedResponseStatus = unauthorizedRequestInfo.mapValues(_ => FetchResponsePartitionData(Errors.TOPIC_AUTHORIZATION_FAILED.code, -1, MessageSet.Empty)) + val unauthorizedPartitionData = unauthorizedRequestInfo.mapValues { _ => + FetchResponsePartitionData(Errors.TOPIC_AUTHORIZATION_FAILED.code, -1, MessageSet.Empty) + } // the callback for sending a fetch response def sendResponseCallback(responsePartitionData: Map[TopicAndPartition, FetchResponsePartitionData]) { - val convertedResponseStatus = + val convertedPartitionData = // Need to down-convert message when consumer only takes magic value 0. if (fetchRequest.versionId <= 1) { - responsePartitionData.map({ case (tp, data) => - tp -> { - // We only do down-conversion when: - // 1. The message format version configured for the topic is using magic value > 0, and - // 2. The message set contains message whose magic > 0 - // This is to reduce the message format conversion as much as possible. The conversion will only occur - // when new message format is used for the topic and we see an old request. - // Please notice that if the message format is changed from a higher version back to lower version this - // test might break because some messages in new message format can be delivered to consumers before 0.10.0.0 - // without format down conversion. - if (replicaManager.getMessageFormatVersion(tp).exists(_ > Message.MagicValue_V0) && - !data.messages.magicValueInAllWrapperMessages(Message.MagicValue_V0)) { - trace("Down converting message to V0 for fetch request from " + fetchRequest.clientId) - new FetchResponsePartitionData(data.error, data.hw, data.messages.asInstanceOf[FileMessageSet].toMessageFormat(Message.MagicValue_V0)) - } else - data - } - }) - } else - responsePartitionData + responsePartitionData.map { case (tp, data) => + + // We only do down-conversion when: + // 1. The message format version configured for the topic is using magic value > 0, and + // 2. The message set contains message whose magic > 0 + // This is to reduce the message format conversion as much as possible. The conversion will only occur + // when new message format is used for the topic and we see an old request. + // Please note that if the message format is changed from a higher version back to lower version this + // test might break because some messages in new message format can be delivered to consumers before 0.10.0.0 + // without format down conversion. + val convertedData = if (replicaManager.getMessageFormatVersion(tp).exists(_ > Message.MagicValue_V0) && + !data.messages.isMagicValueInAllWrapperMessages(Message.MagicValue_V0)) { + trace(s"Down converting message to V0 for fetch request from ${fetchRequest.clientId}") + new FetchResponsePartitionData(data.error, data.hw, data.messages.asInstanceOf[FileMessageSet].toMessageFormat(Message.MagicValue_V0)) + } else data + + tp -> convertedData + } + } else responsePartitionData - val mergedResponseStatus = convertedResponseStatus ++ unauthorizedResponseStatus + val mergedPartitionData = convertedPartitionData ++ unauthorizedPartitionData - mergedResponseStatus.foreach { case (topicAndPartition, data) => - if (data.error != Errors.NONE.code) { - debug("Fetch request with correlation id %d from client %s on partition %s failed due to %s" - .format(fetchRequest.correlationId, fetchRequest.clientId, - topicAndPartition, Errors.forCode(data.error).exceptionName)) - } + mergedPartitionData.foreach { case (topicAndPartition, data) => + if (data.error != Errors.NONE.code) + debug(s"Fetch request with correlation id ${fetchRequest.correlationId} from client ${fetchRequest.clientId} " + + s"on partition $topicAndPartition failed due to ${Errors.forCode(data.error).exceptionName}") // record the bytes out metrics only when the response is being sent BrokerTopicStats.getBrokerTopicStats(topicAndPartition.topic).bytesOutRate.mark(data.messages.sizeInBytes) BrokerTopicStats.getBrokerAllTopicsStats().bytesOutRate.mark(data.messages.sizeInBytes) } def fetchResponseCallback(delayTimeMs: Int) { - trace(s"Sending fetch response to ${fetchRequest.clientId} with ${convertedResponseStatus.values.map(_.messages.sizeInBytes).sum}" + - s" bytes") - val response = FetchResponse(fetchRequest.correlationId, mergedResponseStatus, fetchRequest.versionId, delayTimeMs) + trace(s"Sending fetch response to client ${fetchRequest.clientId} of " + + s"${convertedPartitionData.values.map(_.messages.sizeInBytes).sum} bytes") + val response = FetchResponse(fetchRequest.correlationId, mergedPartitionData, fetchRequest.versionId, delayTimeMs) requestChannel.sendResponse(new RequestChannel.Response(request, new FetchResponseSend(request.connectionId, response))) } @@ -482,7 +479,7 @@ class KafkaApis(val requestChannel: RequestChannel, fetchResponseCallback(0) } else { quotaManagers(ApiKeys.FETCH.id).recordAndMaybeThrottle(fetchRequest.clientId, - FetchResponse.responseSize(responsePartitionData.groupBy(_._1.topic), + FetchResponse.responseSize(mergedPartitionData.groupBy(_._1.topic), fetchRequest.versionId), fetchResponseCallback) }