KAFKA-3259 KAFKA-3253; KIP-31/KIP-32 Follow-up

This PR includes a number of clean-ups:
* Code style
* Documentation wording improvements
* Efficiency improvements

Author: Ismael Juma <ism...@juma.me.uk>

Reviewers: Jun Rao <jun...@gmail.com>

Closes #943 from ijuma/kafka-3259-kip-31-32-clean-ups


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/01aeea7c
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/01aeea7c
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/01aeea7c

Branch: refs/heads/trunk
Commit: 01aeea7c7bca34f1edce40116b7721335938b13b
Parents: 1bfadda
Author: Ismael Juma <ism...@juma.me.uk>
Authored: Wed Feb 24 11:28:53 2016 -0800
Committer: Jun Rao <jun...@gmail.com>
Committed: Wed Feb 24 11:28:53 2016 -0800

----------------------------------------------------------------------
 .../org/apache/kafka/common/record/Record.java  |   2 +-
 .../kafka/common/record/TimestampType.java      |  32 +--
 .../org/apache/kafka/common/utils/Utils.java    |   2 +-
 core/src/main/scala/kafka/api/ApiVersion.scala  |  14 +-
 core/src/main/scala/kafka/common/LongRef.scala  |  61 +++++
 .../controller/ControllerChannelManager.scala   |   2 +-
 .../kafka/coordinator/GroupCoordinator.scala    |   8 +-
 .../javaapi/message/ByteBufferMessageSet.scala  |   5 +-
 .../main/scala/kafka/log/FileMessageSet.scala   |  25 +-
 core/src/main/scala/kafka/log/Log.scala         |  65 ++---
 core/src/main/scala/kafka/log/LogCleaner.scala  |  62 ++---
 core/src/main/scala/kafka/log/LogConfig.scala   |  20 +-
 core/src/main/scala/kafka/log/LogManager.scala  |  10 +-
 .../kafka/message/ByteBufferMessageSet.scala    | 239 +++++++++----------
 core/src/main/scala/kafka/message/Message.scala |  53 ++--
 .../main/scala/kafka/message/MessageSet.scala   |  20 +-
 .../scala/kafka/message/MessageWriter.scala     |   2 +-
 .../main/scala/kafka/server/ConfigHandler.scala |  35 ++-
 .../src/main/scala/kafka/server/KafkaApis.scala |  71 +++---
 .../main/scala/kafka/server/KafkaConfig.scala   |  34 ++-
 .../main/scala/kafka/server/KafkaServer.scala   |  12 +-
 .../kafka/server/ReplicaFetcherThread.scala     |   4 +-
 .../scala/kafka/server/ReplicaManager.scala     |  29 +--
 .../scala/kafka/tools/SimpleConsumerShell.scala |   2 +-
 .../kafka/api/BaseConsumerTest.scala            |  18 +-
 .../kafka/api/PlaintextConsumerTest.scala       |   2 +-
 .../kafka/consumer/ConsumerIteratorTest.scala   |   8 +-
 .../GroupCoordinatorResponseTest.scala          |   2 +-
 .../test/scala/unit/kafka/log/CleanerTest.scala |  13 +-
 .../unit/kafka/log/FileMessageSetTest.scala     |   4 +-
 .../scala/unit/kafka/log/LogConfigTest.scala    |  22 +-
 .../scala/unit/kafka/log/LogManagerTest.scala   |   1 -
 .../scala/unit/kafka/log/LogSegmentTest.scala   |   9 +-
 .../src/test/scala/unit/kafka/log/LogTest.scala |  34 +--
 .../message/ByteBufferMessageSetTest.scala      | 136 ++++++-----
 .../unit/kafka/server/KafkaConfigTest.scala     |   2 +-
 docs/upgrade.html                               |  44 ++--
 37 files changed, 575 insertions(+), 529 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/01aeea7c/clients/src/main/java/org/apache/kafka/common/record/Record.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/Record.java 
b/clients/src/main/java/org/apache/kafka/common/record/Record.java
index 8390dc7..147ad86 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/Record.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/Record.java
@@ -347,7 +347,7 @@ public final class Record {
         if (magic() == 0)
             return TimestampType.NO_TIMESTAMP_TYPE;
         else
-            return wrapperRecordTimestampType == null ? 
TimestampType.getTimestampType(attributes()) : wrapperRecordTimestampType;
+            return wrapperRecordTimestampType == null ? 
TimestampType.forAttributes(attributes()) : wrapperRecordTimestampType;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/01aeea7c/clients/src/main/java/org/apache/kafka/common/record/TimestampType.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/record/TimestampType.java 
b/clients/src/main/java/org/apache/kafka/common/record/TimestampType.java
index ab12a35..62fd814 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/TimestampType.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/TimestampType.java
@@ -25,34 +25,28 @@ import java.util.NoSuchElementException;
 public enum TimestampType {
     NO_TIMESTAMP_TYPE(-1, "NoTimestampType"), CREATE_TIME(0, "CreateTime"), 
LOG_APPEND_TIME(1, "LogAppendTime");
 
-    public final int value;
+    public final int id;
     public final String name;
-    TimestampType(int value, String name) {
-        this.value = value;
+    TimestampType(int id, String name) {
+        this.id = id;
         this.name = name;
     }
 
-    public static TimestampType getTimestampType(byte attributes) {
-        int timestampType = (attributes & Record.TIMESTAMP_TYPE_MASK) >> 
Record.TIMESTAMP_TYPE_ATTRIBUTE_OFFSET;
-        return timestampType == 0 ? CREATE_TIME : LOG_APPEND_TIME;
+    public byte updateAttributes(byte attributes) {
+        return this == CREATE_TIME ?
+            (byte) (attributes & ~Record.TIMESTAMP_TYPE_MASK) : (byte) 
(attributes | Record.TIMESTAMP_TYPE_MASK);
     }
 
-    public static byte setTimestampType(byte attributes, TimestampType 
timestampType) {
-        return timestampType == CREATE_TIME ?
-                (byte) (attributes & ~Record.TIMESTAMP_TYPE_MASK) : (byte) 
(attributes | Record.TIMESTAMP_TYPE_MASK);
+    public static TimestampType forAttributes(byte attributes) {
+        int timestampType = (attributes & Record.TIMESTAMP_TYPE_MASK) >> 
Record.TIMESTAMP_TYPE_ATTRIBUTE_OFFSET;
+        return timestampType == 0 ? CREATE_TIME : LOG_APPEND_TIME;
     }
 
     public static TimestampType forName(String name) {
-        switch (name) {
-            case "NoTimestampType":
-                return NO_TIMESTAMP_TYPE;
-            case "CreateTime":
-                return CREATE_TIME;
-            case "LogAppendTime":
-                return LOG_APPEND_TIME;
-            default:
-                throw new NoSuchElementException("Invalid timestamp type " + 
name);
-        }
+        for (TimestampType t : values())
+            if (t.name.equals(name))
+                return t;
+        throw new NoSuchElementException("Invalid timestamp type " + name);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/01aeea7c/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java 
b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
index 8df54a4..daef458 100755
--- a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
+++ b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
@@ -166,7 +166,7 @@ public class Utils {
      * @param buffer The buffer to write to
      * @param value The value to write
      */
-    public static void writetUnsignedInt(ByteBuffer buffer, long value) {
+    public static void writeUnsignedInt(ByteBuffer buffer, long value) {
         buffer.putInt((int) (value & 0xffffffffL));
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/01aeea7c/core/src/main/scala/kafka/api/ApiVersion.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/ApiVersion.scala 
b/core/src/main/scala/kafka/api/ApiVersion.scala
index 6b5fb7c..e2cadd1 100644
--- a/core/src/main/scala/kafka/api/ApiVersion.scala
+++ b/core/src/main/scala/kafka/api/ApiVersion.scala
@@ -51,9 +51,14 @@ object ApiVersion {
     "0.10.0" -> KAFKA_0_10_0_IV0
   )
 
-  def apply(version: String): ApiVersion  = 
versionNameMap(version.split("\\.").slice(0,3).mkString("."))
+  private val versionPattern = "\\.".r
+
+  def apply(version: String): ApiVersion =
+    versionNameMap.getOrElse(versionPattern.split(version).slice(0, 
3).mkString("."),
+      throw new IllegalArgumentException(s"Version `$version` is not a valid 
version"))
 
   def latestVersion = versionNameMap.values.max
+
 }
 
 sealed trait ApiVersion extends Ordered[ApiVersion] {
@@ -61,13 +66,8 @@ sealed trait ApiVersion extends Ordered[ApiVersion] {
   val messageFormatVersion: Byte
   val id: Int
 
-  override def compare(that: ApiVersion): Int = {
+  override def compare(that: ApiVersion): Int =
     ApiVersion.orderingByVersion.compare(this, that)
-  }
-
-  def onOrAfter(that: ApiVersion): Boolean = {
-    this.compare(that) >= 0
-  }
 
   override def toString(): String = version
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/01aeea7c/core/src/main/scala/kafka/common/LongRef.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/common/LongRef.scala 
b/core/src/main/scala/kafka/common/LongRef.scala
new file mode 100644
index 0000000..f2b1e32
--- /dev/null
+++ b/core/src/main/scala/kafka/common/LongRef.scala
@@ -0,0 +1,61 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *    http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package kafka.common
+
+/**
+  * A mutable cell that holds a value of type `Long`. One should generally 
prefer using value-based programming (i.e.
+  * passing and returning `Long` values), but this class can be useful in some 
scenarios.
+  *
+  * Unlike `AtomicLong`, this class is not thread-safe and there are no 
atomicity guarantees.
+  */
+class LongRef(var value: Long) {
+
+  def addAndGet(delta: Long): Long = {
+    value += delta
+    value
+  }
+
+  def getAndAdd(delta: Long): Long = {
+    val result = value
+    value += delta
+    result
+  }
+
+  def getAndIncrement(): Long = {
+    val v = value
+    value += 1
+    v
+  }
+
+  def incrementAndGet(): Long = {
+    value += 1
+    value
+  }
+
+  def getAndDecrement(): Long = {
+    val v = value
+    value -= 1
+    v
+  }
+
+  def decrementAndGet(): Long = {
+    value -= 1
+    value
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/01aeea7c/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala 
b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
index 513f383..3b1a458 100755
--- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
+++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
@@ -380,7 +380,7 @@ class ControllerBrokerRequestBatch(controller: 
KafkaController) extends  Logging
           topicPartition -> partitionState
         }
 
-        val version = if 
(controller.config.interBrokerProtocolVersion.onOrAfter(KAFKA_0_9_0)) (1: 
Short) else (0: Short)
+        val version = if (controller.config.interBrokerProtocolVersion >= 
KAFKA_0_9_0) (1: Short) else (0: Short)
 
         val updateMetadataRequest =
           if (version == 0) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/01aeea7c/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala 
b/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala
index c86e87b..cb08358 100644
--- a/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala
+++ b/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala
@@ -724,10 +724,10 @@ object GroupCoordinator {
   // TODO: we store both group metadata and offset data here despite the topic 
name being offsets only
   val GroupMetadataTopicName = "__consumer_offsets"
 
-  def create(config: KafkaConfig,
-             zkUtils: ZkUtils,
-             replicaManager: ReplicaManager,
-             time: Time): GroupCoordinator = {
+  def apply(config: KafkaConfig,
+            zkUtils: ZkUtils,
+            replicaManager: ReplicaManager,
+            time: Time): GroupCoordinator = {
     val offsetConfig = OffsetConfig(maxMetadataSize = 
config.offsetMetadataMaxSize,
       loadBufferSize = config.offsetsLoadBufferSize,
       offsetsRetentionMs = config.offsetsRetentionMinutes * 60 * 1000L,

http://git-wip-us.apache.org/repos/asf/kafka/blob/01aeea7c/core/src/main/scala/kafka/javaapi/message/ByteBufferMessageSet.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/kafka/javaapi/message/ByteBufferMessageSet.scala 
b/core/src/main/scala/kafka/javaapi/message/ByteBufferMessageSet.scala
index df30279..590db83 100644
--- a/core/src/main/scala/kafka/javaapi/message/ByteBufferMessageSet.scala
+++ b/core/src/main/scala/kafka/javaapi/message/ByteBufferMessageSet.scala
@@ -16,8 +16,9 @@
 */
 package kafka.javaapi.message
 
-import java.util.concurrent.atomic.AtomicLong
 import java.nio.ByteBuffer
+
+import kafka.common.LongRef
 import kafka.message._
 
 import scala.collection.JavaConverters._
@@ -26,7 +27,7 @@ class ByteBufferMessageSet(val buffer: ByteBuffer) extends 
MessageSet {
   private val underlying: kafka.message.ByteBufferMessageSet = new 
kafka.message.ByteBufferMessageSet(buffer)
   
   def this(compressionCodec: CompressionCodec, messages: 
java.util.List[Message]) {
-    this(new kafka.message.ByteBufferMessageSet(compressionCodec, new 
AtomicLong(0), messages.asScala: _*).buffer)
+    this(new kafka.message.ByteBufferMessageSet(compressionCodec, new 
LongRef(0), messages.asScala: _*).buffer)
   }
 
   def this(messages: java.util.List[Message]) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/01aeea7c/core/src/main/scala/kafka/log/FileMessageSet.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/FileMessageSet.scala 
b/core/src/main/scala/kafka/log/FileMessageSet.scala
index fe31ad4..45b3df9 100755
--- a/core/src/main/scala/kafka/log/FileMessageSet.scala
+++ b/core/src/main/scala/kafka/log/FileMessageSet.scala
@@ -174,16 +174,17 @@ class FileMessageSet private[kafka](@volatile var file: 
File,
   }
 
   /**
-    * This method is called before we write messages to socket use zero-copy 
transfer. We need to
-    * make sure all the messages in the message set has expected magic value
+    * This method is called before we write messages to the socket using 
zero-copy transfer. We need to
+    * make sure all the messages in the message set have the expected magic 
value.
+    *
     * @param expectedMagicValue the magic value expected
-    * @return true if all messages has expected magic value, false otherwise
+    * @return true if all messages have expected magic value, false otherwise
     */
-  override def magicValueInAllWrapperMessages(expectedMagicValue: Byte): 
Boolean = {
+  override def isMagicValueInAllWrapperMessages(expectedMagicValue: Byte): 
Boolean = {
     var location = start
     val offsetAndSizeBuffer = ByteBuffer.allocate(MessageSet.LogOverhead)
     val crcAndMagicByteBuffer = ByteBuffer.allocate(Message.CrcLength + 
Message.MagicLength)
-    while(location < end) {
+    while (location < end) {
       offsetAndSizeBuffer.rewind()
       channel.read(offsetAndSizeBuffer, location)
       if (offsetAndSizeBuffer.hasRemaining)
@@ -191,7 +192,7 @@ class FileMessageSet private[kafka](@volatile var file: 
File,
       offsetAndSizeBuffer.rewind()
       offsetAndSizeBuffer.getLong // skip offset field
       val messageSize = offsetAndSizeBuffer.getInt
-      if(messageSize < Message.MinMessageOverhead)
+      if (messageSize < Message.MinMessageOverhead)
         throw new IllegalStateException("Invalid message size: " + messageSize)
       crcAndMagicByteBuffer.rewind()
       channel.read(crcAndMagicByteBuffer, location + MessageSet.LogOverhead)
@@ -203,15 +204,15 @@ class FileMessageSet private[kafka](@volatile var file: 
File,
   }
 
   /**
-   * Convert this message set to use specified message format.
+   * Convert this message set to use the specified message format.
    */
   def toMessageFormat(toMagicValue: Byte): ByteBufferMessageSet = {
     val offsets = new ArrayBuffer[Long]
     val newMessages = new ArrayBuffer[Message]
-    this.iterator().foreach(messageAndOffset => {
+    this.foreach { messageAndOffset =>
       val message = messageAndOffset.message
       if (message.compressionCodec == NoCompressionCodec) {
-        newMessages += messageAndOffset.message.toFormatVersion(toMagicValue)
+        newMessages += message.toFormatVersion(toMagicValue)
         offsets += messageAndOffset.offset
       } else {
         // File message set only has shallow iterator. We need to do deep 
iteration here if needed.
@@ -221,19 +222,19 @@ class FileMessageSet private[kafka](@volatile var file: 
File,
           offsets += innerMessageAndOffset.offset
         }
       }
-    })
+    }
 
     // We use the offset seq to assign offsets so the offset of the messages 
does not change.
     new ByteBufferMessageSet(
       compressionCodec = 
this.headOption.map(_.message.compressionCodec).getOrElse(NoCompressionCodec),
-      offsetSeq = offsets.toSeq,
+      offsetSeq = offsets,
       newMessages: _*)
   }
 
   /**
    * Get a shallow iterator over the messages in the set.
    */
-  override def iterator() = iterator(Int.MaxValue)
+  override def iterator = iterator(Int.MaxValue)
 
   /**
    * Get an iterator over the messages in the set. We only do shallow 
iteration here.

http://git-wip-us.apache.org/repos/asf/kafka/blob/01aeea7c/core/src/main/scala/kafka/log/Log.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/Log.scala 
b/core/src/main/scala/kafka/log/Log.scala
index f8c0b77..fd176b1 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -21,17 +21,16 @@ import kafka.utils._
 import kafka.message._
 import kafka.common._
 import kafka.metrics.KafkaMetricsGroup
-import kafka.server.{LogOffsetMetadata, FetchDataInfo, BrokerTopicStats}
-
-import java.io.{IOException, File}
+import kafka.server.{BrokerTopicStats, FetchDataInfo, LogOffsetMetadata}
+import java.io.{File, IOException}
 import java.util.concurrent.{ConcurrentNavigableMap, ConcurrentSkipListMap}
 import java.util.concurrent.atomic._
 import java.text.NumberFormat
-import org.apache.kafka.common.errors.{OffsetOutOfRangeException, 
RecordBatchTooLargeException, RecordTooLargeException, CorruptRecordException}
+
+import org.apache.kafka.common.errors.{CorruptRecordException, 
OffsetOutOfRangeException, RecordBatchTooLargeException, 
RecordTooLargeException}
 import org.apache.kafka.common.record.TimestampType
 
 import scala.collection.JavaConversions
-
 import com.yammer.metrics.core.Gauge
 
 object LogAppendInfo {
@@ -320,7 +319,7 @@ class Log(val dir: File,
     val appendInfo = analyzeAndValidateMessageSet(messages)
 
     // if we have any valid messages, append them to the log
-    if(appendInfo.shallowCount == 0)
+    if (appendInfo.shallowCount == 0)
       return appendInfo
 
     // trim any invalid bytes or partial messages before appending it to the 
on-disk log
@@ -333,42 +332,46 @@ class Log(val dir: File,
 
         if (assignOffsets) {
           // assign offsets to the message set
-          val offset = new AtomicLong(nextOffsetMetadata.messageOffset)
-          val now = SystemTime.milliseconds
-          try {
-            validMessages = 
validMessages.validateMessagesAndAssignOffsets(offset,
-                                                                           now,
-                                                                           
appendInfo.sourceCodec,
-                                                                           
appendInfo.targetCodec,
-                                                                           
config.compact,
-                                                                           
config.messageFormatVersion,
-                                                                           
config.messageTimestampType,
-                                                                           
config.messageTimestampDifferenceMaxMs)
+          val offset = new LongRef(nextOffsetMetadata.messageOffset)
+          val now = time.milliseconds
+          val (validatedMessages, messageSizesMaybeChanged) = try {
+            validMessages.validateMessagesAndAssignOffsets(offset,
+                                                           now,
+                                                           
appendInfo.sourceCodec,
+                                                           
appendInfo.targetCodec,
+                                                           config.compact,
+                                                           
config.messageFormatVersion.messageFormatVersion,
+                                                           
config.messageTimestampType,
+                                                           
config.messageTimestampDifferenceMaxMs)
           } catch {
             case e: IOException => throw new KafkaException("Error in 
validating messages while appending to log '%s'".format(name), e)
           }
-          appendInfo.lastOffset = offset.get - 1
-          // If log append time is used, we put the timestamp assigned to the 
messages in the append info.
+          validMessages = validatedMessages
+          appendInfo.lastOffset = offset.value - 1
           if (config.messageTimestampType == TimestampType.LOG_APPEND_TIME)
             appendInfo.timestamp = now
+
+          // re-validate message sizes if there's a possibility that they have 
changed (due to re-compression or message
+          // format conversion)
+          if (messageSizesMaybeChanged) {
+            for (messageAndOffset <- validMessages.shallowIterator) {
+              if (MessageSet.entrySize(messageAndOffset.message) > 
config.maxMessageSize) {
+                // we record the original message set size instead of the 
trimmed size
+                // to be consistent with pre-compression bytesRejectedRate 
recording
+                
BrokerTopicStats.getBrokerTopicStats(topicAndPartition.topic).bytesRejectedRate.mark(messages.sizeInBytes)
+                
BrokerTopicStats.getBrokerAllTopicsStats.bytesRejectedRate.mark(messages.sizeInBytes)
+                throw new RecordTooLargeException("Message size is %d bytes 
which exceeds the maximum configured message size of %d."
+                  .format(MessageSet.entrySize(messageAndOffset.message), 
config.maxMessageSize))
+              }
+            }
+          }
+
         } else {
           // we are taking the offsets we are given
           if (!appendInfo.offsetsMonotonic || appendInfo.firstOffset < 
nextOffsetMetadata.messageOffset)
             throw new IllegalArgumentException("Out of order offsets found in 
" + messages)
         }
 
-        // re-validate message sizes since after re-compression some may 
exceed the limit
-        for (messageAndOffset <- validMessages.shallowIterator) {
-          if (MessageSet.entrySize(messageAndOffset.message) > 
config.maxMessageSize) {
-            // we record the original message set size instead of trimmed size
-            // to be consistent with pre-compression bytesRejectedRate 
recording
-            
BrokerTopicStats.getBrokerTopicStats(topicAndPartition.topic).bytesRejectedRate.mark(messages.sizeInBytes)
-            
BrokerTopicStats.getBrokerAllTopicsStats.bytesRejectedRate.mark(messages.sizeInBytes)
-            throw new RecordTooLargeException("Message size is %d bytes which 
exceeds the maximum configured message size of %d."
-              .format(MessageSet.entrySize(messageAndOffset.message), 
config.maxMessageSize))
-          }
-        }
-
         // check messages set size may be exceed config.segmentSize
         if (validMessages.sizeInBytes > config.segmentSize) {
           throw new RecordBatchTooLargeException("Message set size is %d bytes 
which exceeds the maximum configured segment size of %d."

http://git-wip-us.apache.org/repos/asf/kafka/blob/01aeea7c/core/src/main/scala/kafka/log/LogCleaner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala 
b/core/src/main/scala/kafka/log/LogCleaner.scala
index a3aff15..a2e1913 100644
--- a/core/src/main/scala/kafka/log/LogCleaner.scala
+++ b/core/src/main/scala/kafka/log/LogCleaner.scala
@@ -370,7 +370,7 @@ private[log] class Cleaner(val id: Int,
         val retainDeletes = old.lastModified > deleteHorizonMs
         info("Cleaning segment %s in log %s (last modified %s) into %s, %s 
deletes."
             .format(old.baseOffset, log.name, new Date(old.lastModified), 
cleaned.baseOffset, if(retainDeletes) "retaining" else "discarding"))
-        cleanInto(log.topicAndPartition, old, cleaned, map, retainDeletes, 
log.config.messageFormatVersion)
+        cleanInto(log.topicAndPartition, old, cleaned, map, retainDeletes, 
log.config.messageFormatVersion.messageFormatVersion)
       }
 
       // trim excess index
@@ -430,28 +430,30 @@ private[log] class Cleaner(val id: Int,
           }
           messagesRead += 1
         } else {
-          // We use absolute offset to decide whether retain the message or 
not. This is handled by
+          // We use the absolute offset to decide whether to retain the 
message or not. This is handled by the
           // deep iterator.
           val messages = ByteBufferMessageSet.deepIterator(entry)
-          var numberOfInnerMessages = 0
-          var formatConversionNeeded = false
-          val retainedMessages = messages.filter(messageAndOffset => {
+          var writeOriginalMessageSet = true
+          val retainedMessages = new mutable.ArrayBuffer[MessageAndOffset]
+          messages.foreach { messageAndOffset =>
             messagesRead += 1
-            numberOfInnerMessages += 1
-            if (messageAndOffset.message.magic != messageFormatVersion)
-              formatConversionNeeded = true
-            shouldRetainMessage(source, map, retainDeletes, messageAndOffset)
-          }).toSeq
-
-          // There is no messages compacted out and no message format 
conversion, write the original message set back
-          if (retainedMessages.size == numberOfInnerMessages && 
!formatConversionNeeded)
-            ByteBufferMessageSet.writeMessage(writeBuffer, entry.message, 
entry.offset)
-          else if (retainedMessages.nonEmpty) {
-            val convertedRetainedMessages = 
retainedMessages.map(messageAndOffset => {
-              new 
MessageAndOffset(messageAndOffset.message.toFormatVersion(messageFormatVersion),
 messageAndOffset.offset)
-            })
-            compressMessages(writeBuffer, entry.message.compressionCodec, 
messageFormatVersion, convertedRetainedMessages)
+            if (shouldRetainMessage(source, map, retainDeletes, 
messageAndOffset)) {
+              retainedMessages += {
+                if (messageAndOffset.message.magic != messageFormatVersion) {
+                  writeOriginalMessageSet = false
+                  new 
MessageAndOffset(messageAndOffset.message.toFormatVersion(messageFormatVersion),
 messageAndOffset.offset)
+                }
+                else messageAndOffset
+              }
+            }
+            else writeOriginalMessageSet = false
           }
+
+          // There are no messages compacted out and no message format 
conversion, write the original message set back
+          if (writeOriginalMessageSet)
+            ByteBufferMessageSet.writeMessage(writeBuffer, entry.message, 
entry.offset)
+          else if (retainedMessages.nonEmpty)
+            compressMessages(writeBuffer, entry.message.compressionCodec, 
messageFormatVersion, retainedMessages)
         }
       }
 
@@ -474,27 +476,27 @@ private[log] class Cleaner(val id: Int,
   private def compressMessages(buffer: ByteBuffer,
                                compressionCodec: CompressionCodec,
                                messageFormatVersion: Byte,
-                               messages: Seq[MessageAndOffset]) {
-    val messagesIterable = messages.toIterable.map(_.message)
-    if (messages.isEmpty) {
+                               messageAndOffsets: Seq[MessageAndOffset]) {
+    val messages = messageAndOffsets.map(_.message)
+    if (messageAndOffsets.isEmpty) {
       MessageSet.Empty.sizeInBytes
     } else if (compressionCodec == NoCompressionCodec) {
-      for(messageOffset <- messages)
+      for (messageOffset <- messageAndOffsets)
         ByteBufferMessageSet.writeMessage(buffer, messageOffset.message, 
messageOffset.offset)
-      MessageSet.messageSetSize(messagesIterable)
+      MessageSet.messageSetSize(messages)
     } else {
-      val magicAndTimestamp = 
MessageSet.magicAndLargestTimestamp(messages.map(_.message))
-      val firstAbsoluteOffset = messages.head.offset
+      val magicAndTimestamp = MessageSet.magicAndLargestTimestamp(messages)
+      val firstMessageOffset = messageAndOffsets.head
+      val firstAbsoluteOffset = firstMessageOffset.offset
       var offset = -1L
-      val timestampType = messages.head.message.timestampType
-      val messageWriter = new 
MessageWriter(math.min(math.max(MessageSet.messageSetSize(messagesIterable) / 
2, 1024), 1 << 16))
+      val timestampType = firstMessageOffset.message.timestampType
+      val messageWriter = new 
MessageWriter(math.min(math.max(MessageSet.messageSetSize(messages) / 2, 1024), 
1 << 16))
       messageWriter.write(codec = compressionCodec, timestamp = 
magicAndTimestamp.timestamp, timestampType = timestampType, magicValue = 
messageFormatVersion) { outputStream =>
         val output = new DataOutputStream(CompressionFactory(compressionCodec, 
outputStream))
         try {
-          for (messageOffset <- messages) {
+          for (messageOffset <- messageAndOffsets) {
             val message = messageOffset.message
             offset = messageOffset.offset
-            // Use inner offset when magic value is greater than 0
             if (messageFormatVersion > Message.MagicValue_V0) {
               // The offset of the messages are absolute offset, compute the 
inner offset.
               val innerOffset = messageOffset.offset - firstAbsoluteOffset

http://git-wip-us.apache.org/repos/asf/kafka/blob/01aeea7c/core/src/main/scala/kafka/log/LogConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogConfig.scala 
b/core/src/main/scala/kafka/log/LogConfig.scala
index a8fffbd..a76dce7 100755
--- a/core/src/main/scala/kafka/log/LogConfig.scala
+++ b/core/src/main/scala/kafka/log/LogConfig.scala
@@ -19,6 +19,8 @@ package kafka.log
 
 import java.util.Properties
 
+import scala.collection.JavaConverters._
+
 import kafka.api.ApiVersion
 import kafka.message.{BrokerCompressionCodec, Message}
 import kafka.server.KafkaConfig
@@ -73,9 +75,9 @@ case class LogConfig(props: java.util.Map[_, _]) extends 
AbstractConfig(LogConfi
   val minInSyncReplicas = getInt(LogConfig.MinInSyncReplicasProp)
   val compressionType = getString(LogConfig.CompressionTypeProp).toLowerCase
   val preallocate = getBoolean(LogConfig.PreAllocateEnableProp)
-  val messageFormatVersion = 
ApiVersion(getString(LogConfig.MessageFormatVersionProp)).messageFormatVersion
+  val messageFormatVersion = 
ApiVersion(getString(LogConfig.MessageFormatVersionProp))
   val messageTimestampType = 
TimestampType.forName(getString(LogConfig.MessageTimestampTypeProp))
-  val messageTimestampDifferenceMaxMs = 
getLong(LogConfig.MessageTimestampDifferenceMaxMsProp)
+  val messageTimestampDifferenceMaxMs = 
getLong(LogConfig.MessageTimestampDifferenceMaxMsProp).longValue
 
   def randomSegmentJitter: Long =
     if (segmentJitterMs == 0) 0 else Utils.abs(scala.util.Random.nextInt()) % 
math.min(segmentJitterMs, segmentMs)
@@ -177,12 +179,8 @@ object LogConfig {
 
   def apply(): LogConfig = LogConfig(new Properties())
 
-  def configNames() = {
-    import scala.collection.JavaConversions._
-    configDef.names().toList.sorted
-  }
-
-
+  def configNames: Seq[String] = configDef.names.asScala.toSeq.sorted
+  
   /**
    * Create a log config instance using the given properties and defaults
    */
@@ -197,10 +195,8 @@ object LogConfig {
    * Check that property names are valid
    */
   def validateNames(props: Properties) {
-    import scala.collection.JavaConversions._
-    val names = configDef.names()
-    for(name <- props.keys)
-      require(names.contains(name), "Unknown configuration 
\"%s\".".format(name))
+    val names = configNames
+    for (name <- props.keys.asScala) require(names.contains(name), s"Unknown 
configuration `$name`.")
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/01aeea7c/core/src/main/scala/kafka/log/LogManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogManager.scala 
b/core/src/main/scala/kafka/log/LogManager.scala
index 69386c1..b64fac6 100755
--- a/core/src/main/scala/kafka/log/LogManager.scala
+++ b/core/src/main/scala/kafka/log/LogManager.scala
@@ -19,11 +19,13 @@ package kafka.log
 
 import java.io._
 import java.util.concurrent.TimeUnit
+
 import kafka.utils._
+
 import scala.collection._
-import kafka.common.{TopicAndPartition, KafkaException}
-import kafka.server.{RecoveringFromUncleanShutdown, BrokerState, 
OffsetCheckpoint}
-import java.util.concurrent.{Executors, ExecutorService, ExecutionException, 
Future}
+import kafka.common.{KafkaException, TopicAndPartition}
+import kafka.server.{BrokerState, OffsetCheckpoint, 
RecoveringFromUncleanShutdown}
+import java.util.concurrent.{ExecutionException, ExecutorService, Executors, 
Future}
 
 /**
  * The entry point to the kafka log management subsystem. The log manager is 
responsible for log creation, retrieval, and cleaning.
@@ -463,7 +465,7 @@ class LogManager(val logDirs: Array[File],
   /**
    * Get a map of TopicAndPartition => Log
    */
-  def logsByTopicPartition = logs.toMap
+  def logsByTopicPartition: Map[TopicAndPartition, Log] = logs.toMap
 
   /**
    * Map of log dir to logs by topic and partitions in that dir

http://git-wip-us.apache.org/repos/asf/kafka/blob/01aeea7c/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala 
b/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
index 2867c78..856f971 100644
--- a/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
+++ b/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
@@ -18,33 +18,33 @@
 package kafka.message
 
 import kafka.utils.{IteratorTemplate, Logging}
-import kafka.common.KafkaException
-
+import kafka.common.{KafkaException, LongRef}
 import java.nio.ByteBuffer
 import java.nio.channels._
 import java.io._
-import java.util.concurrent.atomic.{AtomicInteger, AtomicLong}
+import java.util.ArrayDeque
+import java.util.concurrent.atomic.AtomicLong
+
+import scala.collection.JavaConverters._
 
 import org.apache.kafka.common.errors.InvalidTimestampException
 import org.apache.kafka.common.record.TimestampType
 import org.apache.kafka.common.utils.Utils
 
 import scala.collection.mutable
-import scala.collection.mutable.ListBuffer
 
 object ByteBufferMessageSet {
 
-  private def create(offsetAssignor: OffsetAssigner,
+  private def create(offsetAssigner: OffsetAssigner,
                      compressionCodec: CompressionCodec,
                      wrapperMessageTimestamp: Option[Long],
                      timestampType: TimestampType,
                      messages: Message*): ByteBuffer = {
-    if(messages.size == 0) {
+    if (messages.isEmpty)
       MessageSet.Empty.buffer
-    } else if(compressionCodec == NoCompressionCodec) {
+    else if (compressionCodec == NoCompressionCodec) {
       val buffer = ByteBuffer.allocate(MessageSet.messageSetSize(messages))
-      for(message <- messages)
-        writeMessage(buffer, message, offsetAssignor.nextAbsoluteOffset)
+      for (message <- messages) writeMessage(buffer, message, 
offsetAssigner.nextAbsoluteOffset())
       buffer.rewind()
       buffer
     } else {
@@ -58,12 +58,12 @@ object ByteBufferMessageSet {
         val output = new DataOutputStream(CompressionFactory(compressionCodec, 
outputStream))
         try {
           for (message <- messages) {
-            offset = offsetAssignor.nextAbsoluteOffset
+            offset = offsetAssigner.nextAbsoluteOffset()
             if (message.magic != magicAndTimestamp.magic)
               throw new IllegalArgumentException("Messages in the message set 
must have same magic value")
             // Use inner offset if magic value is greater than 0
             if (magicAndTimestamp.magic > Message.MagicValue_V0)
-              output.writeLong(offsetAssignor.toInnerOffset(offset))
+              output.writeLong(offsetAssigner.toInnerOffset(offset))
             else
               output.writeLong(offset)
             output.writeInt(message.size)
@@ -87,24 +87,22 @@ object ByteBufferMessageSet {
 
     new IteratorTemplate[MessageAndOffset] {
 
-      val wrapperMessageOffset = wrapperMessageAndOffset.offset
-      val wrapperMessage = wrapperMessageAndOffset.message
+      val MessageAndOffset(wrapperMessage, wrapperMessageOffset) = 
wrapperMessageAndOffset
       val wrapperMessageTimestampOpt: Option[Long] =
         if (wrapperMessage.magic > MagicValue_V0) 
Some(wrapperMessage.timestamp) else None
       val wrapperMessageTimestampTypeOpt: Option[TimestampType] =
         if (wrapperMessage.magic > MagicValue_V0) 
Some(wrapperMessage.timestampType) else None
       if (wrapperMessage.payload == null)
-        throw new KafkaException("Message payload is null: " + wrapperMessage)
-      val inputStream: InputStream = new 
ByteBufferBackedInputStream(wrapperMessage.payload)
-      val compressed: DataInputStream = new 
DataInputStream(CompressionFactory(wrapperMessage.compressionCodec, 
inputStream))
+        throw new KafkaException(s"Message payload is null: $wrapperMessage")
+      val inputStream = new ByteBufferBackedInputStream(wrapperMessage.payload)
+      val compressed = new 
DataInputStream(CompressionFactory(wrapperMessage.compressionCodec, 
inputStream))
       var lastInnerOffset = -1L
 
       val messageAndOffsets = if (wrapperMessageAndOffset.message.magic > 
MagicValue_V0) {
-        var innerMessageAndOffsets = new mutable.Queue[MessageAndOffset]()
+        val innerMessageAndOffsets = new ArrayDeque[MessageAndOffset]()
         try {
-          while (true) {
-            innerMessageAndOffsets += readMessageFromStream()
-          }
+          while (true)
+            innerMessageAndOffsets.add(readMessageFromStream())
         } catch {
           case eofe: EOFException =>
             compressed.close()
@@ -112,23 +110,18 @@ object ByteBufferMessageSet {
             throw new KafkaException(ioe)
         }
         Some(innerMessageAndOffsets)
-      } else {
-        None
-      }
+      } else None
 
-      private def readMessageFromStream() = {
-        // read the offset
+      private def readMessageFromStream(): MessageAndOffset = {
         val innerOffset = compressed.readLong()
-        // read record size
-        val size = compressed.readInt()
+        val recordSize = compressed.readInt()
 
-        if (size < MinMessageOverhead)
-          throw new InvalidMessageException("Message found with corrupt size 
(" + size + ") in deep iterator")
+        if (recordSize < MinMessageOverhead)
+          throw new InvalidMessageException(s"Message found with corrupt size 
`$recordSize` in deep iterator")
 
-        // read the record into an intermediate record buffer
-        // and hence has to do extra copy
-        val bufferArray = new Array[Byte](size)
-        compressed.readFully(bufferArray, 0, size)
+        // read the record into an intermediate record buffer (i.e. extra copy 
needed)
+        val bufferArray = new Array[Byte](recordSize)
+        compressed.readFully(bufferArray, 0, recordSize)
         val buffer = ByteBuffer.wrap(bufferArray)
 
         // Override the timestamp if necessary
@@ -146,20 +139,17 @@ object ByteBufferMessageSet {
         messageAndOffsets match {
           // Using inner offset and timestamps
           case Some(innerMessageAndOffsets) =>
-            if (innerMessageAndOffsets.isEmpty)
-              allDone()
-            else {
-              val messageAndOffset = innerMessageAndOffsets.dequeue()
-              val message = messageAndOffset.message
-              val relativeOffset = messageAndOffset.offset - lastInnerOffset
-              val absoluteOffset = wrapperMessageOffset + relativeOffset
-              new MessageAndOffset(message, absoluteOffset)
+            innerMessageAndOffsets.pollFirst() match {
+              case null => allDone()
+              case MessageAndOffset(message, offset) =>
+                val relativeOffset = offset - lastInnerOffset
+                val absoluteOffset = wrapperMessageOffset + relativeOffset
+                new MessageAndOffset(message, absoluteOffset)
             }
           // Not using inner offset and timestamps
           case None =>
-            try {
-              readMessageFromStream()
-            } catch {
+            try readMessageFromStream()
+            catch {
               case eofe: EOFException =>
                 compressed.close()
                 allDone()
@@ -185,17 +175,23 @@ object ByteBufferMessageSet {
   }
 }
 
+private object OffsetAssigner {
+
+  def apply(offsetCounter: LongRef, size: Int): OffsetAssigner =
+    new OffsetAssigner(offsetCounter.value to offsetCounter.addAndGet(size))
+
+}
+
 private class OffsetAssigner(offsets: Seq[Long]) {
-  val index = new AtomicInteger(0)
+  private var index = 0
 
-  def this(offsetCounter: AtomicLong, size: Int) {
-    this((offsetCounter.get() to offsetCounter.get + size).toSeq)
-    offsetCounter.addAndGet(size)
+  def nextAbsoluteOffset(): Long = {
+    val result = offsets(index)
+    index += 1
+    result
   }
 
-  def nextAbsoluteOffset = offsets(index.getAndIncrement)
-
-  def toInnerOffset(offset: Long) = offset - offsets(0)
+  def toInnerOffset(offset: Long): Long = offset - offsets.head
 
 }
 
@@ -209,17 +205,17 @@ private class OffsetAssigner(offsets: Seq[Long]) {
  * Option 2: Give it a list of messages along with instructions relating to 
serialization format. Producers will use this method.
  *
  *
- * When message format v1 is used, there will be following message format 
changes.
- * - For non-compressed messages, with message v1 we are adding timestamp and 
timestamp type attribute. The offsets of
+ * Message format v1 has the following changes:
+ * - For non-compressed messages, timestamp and timestamp type attributes have 
been added. The offsets of
  *   the messages remain absolute offsets.
- * - For Compressed messages, with message v1 we are adding timestamp, 
timestamp type attribute bit and using
- *   inner offsets (IO) for inner messages of compressed messages (see offset 
calculation details below). Timestamp type
- *   attribute is only set in wrapper messages. Inner messages always have 
CreateTime as timestamp type in attributes.
+ * - For compressed messages, timestamp and timestamp type attributes have 
been added and inner offsets (IO) are used
+ *   for inner messages of compressed messages (see offset calculation details 
below). The timestamp type
+ *   attribute is only set in wrapper messages. Inner messages always have 
CreateTime as the timestamp type in attributes.
  *
- * The way timestamp set is following:
- * For non-compressed messages: timestamp and timestamp type attribute in the 
messages are set and used.
+ * We set the timestamp in the following way:
+ * For non-compressed messages: the timestamp and timestamp type message 
attributes are set and used.
  * For compressed messages:
- * 1. Wrapper messages' timestamp type attribute is set to proper value
+ * 1. Wrapper messages' timestamp type attribute is set to the proper value
  * 2. Wrapper messages' timestamp is set to:
  *    - the max timestamp of inner messages if CreateTime is used
  *    - the current server time if wrapper message's timestamp = LogAppendTime.
@@ -227,11 +223,10 @@ private class OffsetAssigner(offsets: Seq[Long]) {
  * 3. Inner messages' timestamp will be:
  *    - used when wrapper message's timestamp type is CreateTime
  *    - ignored when wrapper message's timestamp type is LogAppendTime
- * 4. Inner messages' timestamp type will always be ignored. However, producer 
must set the inner message timestamp
- *    type to CreateTime, otherwise the messages will be rejected by broker.
+ * 4. Inner messages' timestamp type will always be ignored with one 
exception: producers must set the inner message
+ *    timestamp type to CreateTime, otherwise the messages will be rejected by 
broker.
  *
- *
- * The way absolute offset calculated is the following:
+ * Absolute offsets are calculated in the following way:
  * Ideally the conversion from relative offset(RO) to absolute offset(AO) 
should be:
  *
  * AO = AO_Of_Last_Inner_Message + RO
@@ -240,7 +235,7 @@ private class OffsetAssigner(offsets: Seq[Long]) {
  * And the relative offset of an inner message compared with the last inner 
message is not known until
  * the last inner message is written.
  * Unfortunately we are not able to change the previously written messages 
after the last message is written to
- * the message set when stream compressing is used.
+ * the message set when stream compression is used.
  *
  * To solve this issue, we use the following solution:
  *
@@ -254,20 +249,23 @@ private class OffsetAssigner(offsets: Seq[Long]) {
  *    RO = IO_of_a_message - IO_of_the_last_message
  *    AO = AO_Of_Last_Inner_Message + RO
  *
- * 4. This solution works for compacted message set as well
+ * 4. This solution works for compacted message sets as well.
  *
  */
 class ByteBufferMessageSet(val buffer: ByteBuffer) extends MessageSet with 
Logging {
   private var shallowValidByteCount = -1
 
-  def this(compressionCodec: CompressionCodec, messages: Message*) {
-    this(ByteBufferMessageSet.create(new OffsetAssigner(new AtomicLong(0), 
messages.size), compressionCodec,
-      None, TimestampType.CREATE_TIME, messages:_*))
+  private[kafka] def this(compressionCodec: CompressionCodec,
+                          offsetCounter: LongRef,
+                          wrapperMessageTimestamp: Option[Long],
+                          timestampType: TimestampType,
+                          messages: Message*) {
+    this(ByteBufferMessageSet.create(OffsetAssigner(offsetCounter, 
messages.size), compressionCodec,
+      wrapperMessageTimestamp, timestampType, messages:_*))
   }
 
-  def this(compressionCodec: CompressionCodec, offsetCounter: AtomicLong, 
messages: Message*) {
-    this(ByteBufferMessageSet.create(new OffsetAssigner(offsetCounter, 
messages.size), compressionCodec,
-      None, TimestampType.CREATE_TIME, messages:_*))
+  def this(compressionCodec: CompressionCodec, offsetCounter: LongRef, 
messages: Message*) {
+    this(compressionCodec, offsetCounter, None, TimestampType.CREATE_TIME, 
messages:_*)
   }
 
   def this(compressionCodec: CompressionCodec, offsetSeq: Seq[Long], messages: 
Message*) {
@@ -275,31 +273,21 @@ class ByteBufferMessageSet(val buffer: ByteBuffer) 
extends MessageSet with Loggi
       None, TimestampType.CREATE_TIME, messages:_*))
   }
 
-  def this(messages: Message*) {
-    this(NoCompressionCodec, new AtomicLong(0), messages: _*)
+  def this(compressionCodec: CompressionCodec, messages: Message*) {
+    this(compressionCodec, new LongRef(0L), messages: _*)
   }
 
-  // This constructor is only used internally
-  private[kafka] def this(compressionCodec: CompressionCodec,
-                          offsetCounter: AtomicLong,
-                          wrapperMessageTimestamp: Option[Long],
-                          timestampType: TimestampType,
-                          messages: Message*) {
-    this(ByteBufferMessageSet.create(new OffsetAssigner(offsetCounter, 
messages.size), compressionCodec,
-      wrapperMessageTimestamp, timestampType, messages:_*))
+  def this(messages: Message*) {
+    this(NoCompressionCodec, messages: _*)
   }
 
   def getBuffer = buffer
 
   private def shallowValidBytes: Int = {
-    if(shallowValidByteCount < 0) {
-      var bytes = 0
-      val iter = this.internalIterator(true)
-      while(iter.hasNext) {
-        val messageAndOffset = iter.next
-        bytes += MessageSet.entrySize(messageAndOffset.message)
-      }
-      this.shallowValidByteCount = bytes
+    if (shallowValidByteCount < 0) {
+      this.shallowValidByteCount = this.internalIterator(isShallow = true).map 
{ messageAndOffset =>
+        MessageSet.entrySize(messageAndOffset.message)
+      }.sum
     }
     shallowValidByteCount
   }
@@ -315,7 +303,7 @@ class ByteBufferMessageSet(val buffer: ByteBuffer) extends 
MessageSet with Loggi
     written
   }
 
-  override def magicValueInAllWrapperMessages(expectedMagicValue: Byte): 
Boolean = {
+  override def isMagicValueInAllWrapperMessages(expectedMagicValue: Byte): 
Boolean = {
     for (messageAndOffset <- shallowIterator) {
       if (messageAndOffset.message.magic != expectedMagicValue)
         return false
@@ -398,27 +386,28 @@ class ByteBufferMessageSet(val buffer: ByteBuffer) 
extends MessageSet with Loggi
    *
    * If no format conversion or value overwriting is required for messages, 
this method will perform in-place
    * operations and avoid re-compression.
+   *
+   * Returns the message set and a boolean indicating whether the message 
sizes may have changed.
    */
-  private[kafka] def validateMessagesAndAssignOffsets(offsetCounter: 
AtomicLong,
-                                                      now: Long = 
System.currentTimeMillis(),
+  private[kafka] def validateMessagesAndAssignOffsets(offsetCounter: LongRef,
+                                                      now: Long,
                                                       sourceCodec: 
CompressionCodec,
                                                       targetCodec: 
CompressionCodec,
                                                       compactedTopic: Boolean 
= false,
                                                       messageFormatVersion: 
Byte = Message.CurrentMagicValue,
                                                       messageTimestampType: 
TimestampType,
-                                                      
messageTimestampDiffMaxMs: Long): ByteBufferMessageSet = {
+                                                      
messageTimestampDiffMaxMs: Long): (ByteBufferMessageSet, Boolean) = {
     if (sourceCodec == NoCompressionCodec && targetCodec == 
NoCompressionCodec) {
       // check the magic value
-      if (!magicValueInAllWrapperMessages(messageFormatVersion)) {
+      if (!isMagicValueInAllWrapperMessages(messageFormatVersion)) {
         // Message format conversion
-        convertNonCompressedMessages(offsetCounter, compactedTopic, now, 
messageTimestampType, messageTimestampDiffMaxMs,
-          messageFormatVersion)
+        (convertNonCompressedMessages(offsetCounter, compactedTopic, now, 
messageTimestampType, messageTimestampDiffMaxMs,
+          messageFormatVersion), true)
       } else {
         // Do in-place validation, offset assignment and maybe set timestamp
-        validateNonCompressedMessagesAndAssignOffsetInPlace(offsetCounter, 
now, compactedTopic, messageTimestampType,
-          messageTimestampDiffMaxMs)
+        (validateNonCompressedMessagesAndAssignOffsetInPlace(offsetCounter, 
now, compactedTopic, messageTimestampType,
+          messageTimestampDiffMaxMs), false)
       }
-
     } else {
       // Deal with compressed messages
       // We cannot do in place assignment in one of the following situations:
@@ -432,8 +421,8 @@ class ByteBufferMessageSet(val buffer: ByteBuffer) extends 
MessageSet with Loggi
 
       var maxTimestamp = Message.NoTimestamp
       val expectedInnerOffset = new AtomicLong(0)
-      val validatedMessages = new ListBuffer[Message]
-      this.internalIterator(isShallow = false).foreach(messageAndOffset => {
+      val validatedMessages = new mutable.ArrayBuffer[Message]
+      this.internalIterator(isShallow = false).foreach { messageAndOffset =>
         val message = messageAndOffset.message
         validateMessageKey(message, compactedTopic)
         if (message.magic > Message.MagicValue_V0 && messageFormatVersion > 
Message.MagicValue_V0) {
@@ -441,7 +430,7 @@ class ByteBufferMessageSet(val buffer: ByteBuffer) extends 
MessageSet with Loggi
           // Validate the timestamp
           validateTimestamp(message, now, messageTimestampType, 
messageTimestampDiffMaxMs)
           // Check if we need to overwrite offset
-          if (messageAndOffset.offset != expectedInnerOffset.getAndIncrement)
+          if (messageAndOffset.offset != expectedInnerOffset.getAndIncrement())
             inPlaceAssignment = false
           maxTimestamp = math.max(maxTimestamp, message.timestamp)
         }
@@ -451,7 +440,7 @@ class ByteBufferMessageSet(val buffer: ByteBuffer) extends 
MessageSet with Loggi
           inPlaceAssignment = false
 
         validatedMessages += message.toFormatVersion(messageFormatVersion)
-      })
+      }
       if (!inPlaceAssignment) {
         // Cannot do in place assignment.
         val wrapperMessageTimestamp = {
@@ -463,11 +452,11 @@ class ByteBufferMessageSet(val buffer: ByteBuffer) 
extends MessageSet with Loggi
             Some(now)
         }
 
-        new ByteBufferMessageSet(compressionCodec = targetCodec,
-                                 offsetCounter = offsetCounter,
-                                 wrapperMessageTimestamp = 
wrapperMessageTimestamp,
-                                 timestampType = messageTimestampType,
-                                 messages = validatedMessages.toBuffer: _*)
+        (new ByteBufferMessageSet(compressionCodec = targetCodec,
+                                  offsetCounter = offsetCounter,
+                                  wrapperMessageTimestamp = 
wrapperMessageTimestamp,
+                                  timestampType = messageTimestampType,
+                                  messages = validatedMessages: _*), true)
       } else {
         // Do not do re-compression but simply update the offset, timestamp 
and attributes field of the wrapper message.
         buffer.putLong(0, offsetCounter.addAndGet(validatedMessages.size) - 1)
@@ -480,53 +469,49 @@ class ByteBufferMessageSet(val buffer: ByteBuffer) 
extends MessageSet with Loggi
         val timestamp = buffer.getLong(timestampOffset)
         val attributes = buffer.get(attributeOffset)
         if (messageTimestampType == TimestampType.CREATE_TIME && timestamp == 
maxTimestamp)
-            // We don't need to recompute crc if the timestamp is not updated.
-            crcUpdateNeeded = false
+          // We don't need to recompute crc if the timestamp is not updated.
+          crcUpdateNeeded = false
         else if (messageTimestampType == TimestampType.LOG_APPEND_TIME) {
           // Set timestamp type and timestamp
           buffer.putLong(timestampOffset, now)
-          buffer.put(attributeOffset, 
TimestampType.setTimestampType(attributes, TimestampType.LOG_APPEND_TIME))
+          buffer.put(attributeOffset, 
messageTimestampType.updateAttributes(attributes))
         }
 
         if (crcUpdateNeeded) {
           // need to recompute the crc value
           buffer.position(MessageSet.LogOverhead)
           val wrapperMessage = new Message(buffer.slice())
-          Utils.writeUnsignedInt(buffer, MessageSet.LogOverhead + 
Message.CrcOffset, wrapperMessage.computeChecksum())
+          Utils.writeUnsignedInt(buffer, MessageSet.LogOverhead + 
Message.CrcOffset, wrapperMessage.computeChecksum)
         }
         buffer.rewind()
-        this
+        (this, false)
       }
     }
   }
 
-  // We create this method to save memory copy operation. It reads from the 
original message set and directly
+  // We create this method to avoid a memory copy. It reads from the original 
message set and directly
   // writes the converted messages into new message set buffer. Hence we don't 
need to allocate memory for each
   // individual message during message format conversion.
-  private def convertNonCompressedMessages(offsetCounter: AtomicLong,
+  private def convertNonCompressedMessages(offsetCounter: LongRef,
                                            compactedTopic: Boolean,
                                            now: Long,
                                            timestampType: TimestampType,
                                            messageTimestampDiffMaxMs: Long,
                                            toMagicValue: Byte): 
ByteBufferMessageSet = {
-    val sizeInBytesAfterConversion = shallowValidBytes + 
this.internalIterator(isShallow = true).foldLeft(0)(
-      (sizeDiff, messageAndOffset) => sizeDiff + 
Message.headerSizeDiff(messageAndOffset.message.magic, toMagicValue))
+    val sizeInBytesAfterConversion = shallowValidBytes + 
this.internalIterator(isShallow = true).map { messageAndOffset =>
+      Message.headerSizeDiff(messageAndOffset.message.magic, toMagicValue)
+    }.sum
     val newBuffer = ByteBuffer.allocate(sizeInBytesAfterConversion)
     var newMessagePosition = 0
-    this.internalIterator(isShallow = true).foreach {messageAndOffset =>
-      val message = messageAndOffset.message
+    this.internalIterator(isShallow = true).foreach { case 
MessageAndOffset(message, _) =>
       validateMessageKey(message, compactedTopic)
       validateTimestamp(message, now, timestampType, messageTimestampDiffMaxMs)
       newBuffer.position(newMessagePosition)
-      // write offset.
-      newBuffer.putLong(offsetCounter.getAndIncrement)
-      // Write new message size
+      newBuffer.putLong(offsetCounter.getAndIncrement())
       val newMessageSize = message.size + 
Message.headerSizeDiff(message.magic, toMagicValue)
       newBuffer.putInt(newMessageSize)
-      // Create new message buffer
       val newMessageBuffer = newBuffer.slice()
       newMessageBuffer.limit(newMessageSize)
-      // Convert message
       message.convertToBuffer(toMagicValue, newMessageBuffer, now, 
timestampType)
 
       newMessagePosition += MessageSet.LogOverhead + newMessageSize
@@ -535,7 +520,7 @@ class ByteBufferMessageSet(val buffer: ByteBuffer) extends 
MessageSet with Loggi
     new ByteBufferMessageSet(newBuffer)
   }
 
-  private def 
validateNonCompressedMessagesAndAssignOffsetInPlace(offsetCounter: AtomicLong,
+  private def 
validateNonCompressedMessagesAndAssignOffsetInPlace(offsetCounter: LongRef,
                                                                   now: Long,
                                                                   
compactedTopic: Boolean,
                                                                   
timestampType: TimestampType,
@@ -555,8 +540,8 @@ class ByteBufferMessageSet(val buffer: ByteBuffer) extends 
MessageSet with Loggi
         validateTimestamp(message, now, timestampType, timestampDiffMaxMs)
         if (timestampType == TimestampType.LOG_APPEND_TIME) {
           message.buffer.putLong(Message.TimestampOffset, now)
-          message.buffer.put(Message.AttributesOffset, 
TimestampType.setTimestampType(message.attributes, 
TimestampType.LOG_APPEND_TIME))
-          Utils.writeUnsignedInt(message.buffer, Message.CrcOffset, 
message.computeChecksum())
+          message.buffer.put(Message.AttributesOffset, 
timestampType.updateAttributes(message.attributes))
+          Utils.writeUnsignedInt(message.buffer, Message.CrcOffset, 
message.computeChecksum)
         }
       }
       messagePosition += MessageSet.LogOverhead + messageSize

http://git-wip-us.apache.org/repos/asf/kafka/blob/01aeea7c/core/src/main/scala/kafka/message/Message.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/message/Message.scala 
b/core/src/main/scala/kafka/message/Message.scala
index 51aa11a..2ab2e0c 100755
--- a/core/src/main/scala/kafka/message/Message.scala
+++ b/core/src/main/scala/kafka/message/Message.scala
@@ -50,8 +50,8 @@ object Message {
   val ValueSizeLength = 4
 
   private val MessageHeaderSizeMap = Map (
-    0.asInstanceOf[Byte] -> (CrcLength + MagicLength + AttributesLength + 
KeySizeLength + ValueSizeLength),
-    1.asInstanceOf[Byte] -> (CrcLength + MagicLength + AttributesLength + 
TimestampLength + KeySizeLength + ValueSizeLength))
+    (0: Byte) -> (CrcLength + MagicLength + AttributesLength + KeySizeLength + 
ValueSizeLength),
+    (1: Byte) -> (CrcLength + MagicLength + AttributesLength + TimestampLength 
+ KeySizeLength + ValueSizeLength))
 
   /**
    * The amount of overhead bytes in a message
@@ -123,10 +123,10 @@ object Message {
  *
  * Default constructor wraps an existing ByteBuffer with the Message object 
with no change to the contents.
  * @param buffer the byte buffer of this message.
- * @param wrapperMessageTimestamp the wrapper message timestamp, only not None 
when the message is an inner message
- *                                of a compressed message.
- * @param wrapperMessageTimestampType the wrapper message timestamp type, only 
not None when the message is an inner
- *                                    message of a compressed message.
+ * @param wrapperMessageTimestamp the wrapper message timestamp, which is only 
defined when the message is an inner
+ *                                message of a compressed message.
+ * @param wrapperMessageTimestampType the wrapper message timestamp type, 
which is only defined when the message is an
+ *                                    inner message of a compressed message.
  */
 class Message(val buffer: ByteBuffer,
               private val wrapperMessageTimestamp: Option[Long] = None,
@@ -168,11 +168,10 @@ class Message(val buffer: ByteBuffer,
     // skip crc, we will fill that in at the end
     buffer.position(MagicOffset)
     buffer.put(magicValue)
-    var attributes: Byte = 0
-    if (codec.codec > 0) {
-      attributes = (attributes | (CompressionCodeMask & codec.codec)).toByte
-      attributes = TimestampType.setTimestampType(attributes, timestampType)
-    }
+    val attributes: Byte =
+      if (codec.codec > 0)
+        timestampType.updateAttributes((CompressionCodeMask & 
codec.codec).toByte)
+      else 0
     buffer.put(attributes)
     // Only put timestamp when "magic" value is greater than 0
     if (magic > MagicValue_V0)
@@ -213,7 +212,7 @@ class Message(val buffer: ByteBuffer,
   /**
    * Compute the checksum of the message from the message contents
    */
-  def computeChecksum(): Long = 
+  def computeChecksum: Long =
     CoreUtils.crc32(buffer.array, buffer.arrayOffset + MagicOffset,  
buffer.limit - MagicOffset)
   
   /**
@@ -231,7 +230,7 @@ class Message(val buffer: ByteBuffer,
    */
   def ensureValid() {
     if(!isValid)
-      throw new InvalidMessageException("Message is corrupt (stored crc = " + 
checksum + ", computed crc = " + computeChecksum() + ")")
+      throw new InvalidMessageException(s"Message is corrupt (stored crc = 
${checksum}, computed crc = ${computeChecksum})")
   }
   
   /**
@@ -242,7 +241,7 @@ class Message(val buffer: ByteBuffer,
   /**
    * The position where the key size is stored.
    */
-  def keySizeOffset = {
+  private def keySizeOffset = {
     if (magic == MagicValue_V0) KeySizeOffset_V0
     else KeySizeOffset_V1
   }
@@ -260,7 +259,7 @@ class Message(val buffer: ByteBuffer,
   /**
    * The position where the payload size is stored
    */
-  def payloadSizeOffset = {
+  private def payloadSizeOffset = {
     if (magic == MagicValue_V0) KeyOffset_V0 + max(0, keySize)
     else KeyOffset_V1 + max(0, keySize)
   }
@@ -273,7 +272,7 @@ class Message(val buffer: ByteBuffer,
   /**
    * Is the payload of this message null
    */
-  def isNull(): Boolean = payloadSize < 0
+  def isNull: Boolean = payloadSize < 0
   
   /**
    * The magic version of this message
@@ -309,7 +308,7 @@ class Message(val buffer: ByteBuffer,
     if (magic == MagicValue_V0)
       TimestampType.NO_TIMESTAMP_TYPE
     else
-      
wrapperMessageTimestampType.getOrElse(TimestampType.getTimestampType(attributes))
+      
wrapperMessageTimestampType.getOrElse(TimestampType.forAttributes(attributes))
   }
   
   /**
@@ -337,23 +336,23 @@ class Message(val buffer: ByteBuffer,
     else {
       val byteBuffer = ByteBuffer.allocate(size + 
Message.headerSizeDiff(magic, toMagicValue))
       // Copy bytes from old messages to new message
-      convertToBuffer(toMagicValue, byteBuffer)
+      convertToBuffer(toMagicValue, byteBuffer, Message.NoTimestamp)
       new Message(byteBuffer)
     }
   }
 
   def convertToBuffer(toMagicValue: Byte,
                       byteBuffer: ByteBuffer,
-                      now: Long = NoTimestamp,
-                      timestampType: TimestampType = 
wrapperMessageTimestampType.getOrElse(TimestampType.getTimestampType(attributes)))
 {
+                      now: Long,
+                      timestampType: TimestampType = 
wrapperMessageTimestampType.getOrElse(TimestampType.forAttributes(attributes))) 
{
     if (byteBuffer.remaining() < size + headerSizeDiff(magic, toMagicValue))
       throw new IndexOutOfBoundsException("The byte buffer does not have 
enough capacity to hold new message format " +
-        "version " + toMagicValue)
+        s"version $toMagicValue")
     if (toMagicValue == Message.MagicValue_V1) {
       // Up-conversion, reserve CRC and update magic byte
       byteBuffer.position(Message.MagicOffset)
       byteBuffer.put(Message.MagicValue_V1)
-      byteBuffer.put(TimestampType.setTimestampType(attributes, timestampType))
+      byteBuffer.put(timestampType.updateAttributes(attributes))
       // Up-conversion, insert the timestamp field
       if (timestampType == TimestampType.LOG_APPEND_TIME)
         byteBuffer.putLong(now)
@@ -364,13 +363,13 @@ class Message(val buffer: ByteBuffer,
       // Down-conversion, reserve CRC and update magic byte
       byteBuffer.position(Message.MagicOffset)
       byteBuffer.put(Message.MagicValue_V0)
-      byteBuffer.put(TimestampType.setTimestampType(attributes, 
TimestampType.CREATE_TIME))
+      byteBuffer.put(TimestampType.CREATE_TIME.updateAttributes(attributes))
       // Down-conversion, skip the timestamp field
       byteBuffer.put(buffer.array(), buffer.arrayOffset() + 
Message.KeySizeOffset_V1, size - Message.KeySizeOffset_V1)
     }
     // update crc value
     val newMessage = new Message(byteBuffer)
-    Utils.writeUnsignedInt(byteBuffer, Message.CrcOffset, 
newMessage.computeChecksum())
+    Utils.writeUnsignedInt(byteBuffer, Message.CrcOffset, 
newMessage.computeChecksum)
     byteBuffer.rewind()
   }
 
@@ -382,7 +381,7 @@ class Message(val buffer: ByteBuffer,
     if(size < 0) {
       null
     } else {
-      var b = buffer.duplicate
+      var b = buffer.duplicate()
       b.position(start + 4)
       b = b.slice()
       b.limit(size)
@@ -396,9 +395,9 @@ class Message(val buffer: ByteBuffer,
    */
   private def validateTimestampAndMagicValue(timestamp: Long, magic: Byte) {
     if (magic != MagicValue_V0 && magic != MagicValue_V1)
-      throw new IllegalArgumentException("Invalid magic value " + magic)
+      throw new IllegalArgumentException(s"Invalid magic value $magic")
     if (timestamp < 0 && timestamp != NoTimestamp)
-      throw new IllegalArgumentException("Invalid message timestamp " + 
timestamp)
+      throw new IllegalArgumentException(s"Invalid message timestamp 
$timestamp")
     if (magic == MagicValue_V0 && timestamp != NoTimestamp)
       throw new IllegalArgumentException(s"Invalid timestamp $timestamp. 
Timestamp must be ${NoTimestamp} when magic = ${MagicValue_V0}")
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/01aeea7c/core/src/main/scala/kafka/message/MessageSet.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/message/MessageSet.scala 
b/core/src/main/scala/kafka/message/MessageSet.scala
index 014788a..14c455c 100644
--- a/core/src/main/scala/kafka/message/MessageSet.scala
+++ b/core/src/main/scala/kafka/message/MessageSet.scala
@@ -37,30 +37,16 @@ object MessageSet {
     messages.foldLeft(0)(_ + entrySize(_))
 
   /**
-   * The size of a list of messages
-   */
-  def messageSetSize(messages: java.util.List[Message]): Int = {
-    var size = 0
-    val iter = messages.iterator
-    while(iter.hasNext) {
-      val message = iter.next
-      size += entrySize(message)
-    }
-    size
-  }
-  
-  /**
    * The size of a size-delimited entry in a message set
    */
   def entrySize(message: Message): Int = LogOverhead + message.size
 
   /**
-   * Validate the "magic" values of messages are the same in a compressed 
message set and return the magic value of
-   * and the max timestamp of the inner messages.
+   * Validate that all "magic" values in `messages` are the same and return 
their magic value and max timestamp
    */
   def magicAndLargestTimestamp(messages: Seq[Message]): MagicAndTimestamp = {
     val firstMagicValue = messages.head.magic
-    var largestTimestamp: Long = Message.NoTimestamp
+    var largestTimestamp = Message.NoTimestamp
     for (message <- messages) {
       if (message.magic != firstMagicValue)
         throw new IllegalStateException("Messages in the same message set must 
have same magic value")
@@ -92,7 +78,7 @@ abstract class MessageSet extends Iterable[MessageAndOffset] {
   /**
    * Check if all the wrapper messages in the message set have the expected 
magic value
    */
-  def magicValueInAllWrapperMessages(expectedMagicValue: Byte): Boolean
+  def isMagicValueInAllWrapperMessages(expectedMagicValue: Byte): Boolean
 
   /**
    * Provides an iterator over the message/offset pairs in this set

http://git-wip-us.apache.org/repos/asf/kafka/blob/01aeea7c/core/src/main/scala/kafka/message/MessageWriter.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/message/MessageWriter.scala 
b/core/src/main/scala/kafka/message/MessageWriter.scala
index 660772c..e6954ff 100755
--- a/core/src/main/scala/kafka/message/MessageWriter.scala
+++ b/core/src/main/scala/kafka/message/MessageWriter.scala
@@ -40,7 +40,7 @@ class MessageWriter(segmentSize: Int) extends 
BufferingOutputStream(segmentSize)
       if (codec.codec > 0)
         attributes = (attributes | (CompressionCodeMask & codec.codec)).toByte
       if (magicValue > MagicValue_V0)
-        attributes = TimestampType.setTimestampType(attributes, timestampType)
+        attributes = timestampType.updateAttributes(attributes)
       write(attributes)
       // Write timestamp
       if (magicValue > MagicValue_V0)

http://git-wip-us.apache.org/repos/asf/kafka/blob/01aeea7c/core/src/main/scala/kafka/server/ConfigHandler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ConfigHandler.scala 
b/core/src/main/scala/kafka/server/ConfigHandler.scala
index 9343fde..4bdd308 100644
--- a/core/src/main/scala/kafka/server/ConfigHandler.scala
+++ b/core/src/main/scala/kafka/server/ConfigHandler.scala
@@ -20,14 +20,13 @@ package kafka.server
 import java.util.Properties
 
 import kafka.api.ApiVersion
-import kafka.common.TopicAndPartition
-import kafka.log.{Log, LogConfig, LogManager}
+import kafka.log.{LogConfig, LogManager}
 import kafka.utils.Logging
 import org.apache.kafka.common.metrics.Quota
 import org.apache.kafka.common.protocol.ApiKeys
 
-import scala.collection.mutable
 import scala.collection.Map
+import scala.collection.JavaConverters._
 
 /**
  * The ConfigHandler is used to process config change notifications received 
by the DynamicConfigManager
@@ -42,29 +41,27 @@ trait ConfigHandler {
  */
 class TopicConfigHandler(private val logManager: LogManager, kafkaConfig: 
KafkaConfig) extends ConfigHandler with Logging {
 
-  def processConfigChanges(topic : String, topicConfig : Properties) {
-    val logs: mutable.Buffer[(TopicAndPartition, Log)] = 
logManager.logsByTopicPartition.toBuffer
-    val logsByTopic: Map[String, mutable.Buffer[Log]] = logs.groupBy{ case 
(topicAndPartition, log) => topicAndPartition.topic }
-            .mapValues{ case v: mutable.Buffer[(TopicAndPartition, Log)] => 
v.map(_._2) }
+  def processConfigChanges(topic: String, topicConfig: Properties) {
     // Validate the compatibility of message format version.
-    Option(topicConfig.getProperty(LogConfig.MessageFormatVersionProp)) match {
-      case Some(versionString) =>
-        if 
(!kafkaConfig.interBrokerProtocolVersion.onOrAfter(ApiVersion(versionString))) {
-          topicConfig.remove(LogConfig.MessageFormatVersionProp)
-          warn(s"Log configuration ${LogConfig.MessageFormatVersionProp} is 
ignored for $topic because $versionString " +
-            s"is not compatible with Kafka inter broker protocol version 
${kafkaConfig.interBrokerProtocolVersion}")
-        }
-      case _ =>
+    val configNameToExclude = 
Option(topicConfig.getProperty(LogConfig.MessageFormatVersionProp)).flatMap { 
versionString =>
+      if (kafkaConfig.interBrokerProtocolVersion < ApiVersion(versionString)) {
+        warn(s"Log configuration ${LogConfig.MessageFormatVersionProp} is 
ignored for `$topic` because `$versionString` " +
+          s"is not compatible with Kafka inter-broker protocol version 
`${kafkaConfig.interBrokerProtocolVersionString}`")
+        Some(LogConfig.MessageFormatVersionProp)
+      }
+      else None
     }
 
-    if (logsByTopic.contains(topic)) {
+    val logs = logManager.logsByTopicPartition.filterKeys(_.topic == 
topic).values.toBuffer
+    if (logs.nonEmpty) {
       /* combine the default properties with the overrides in zk to create the 
new LogConfig */
       val props = new Properties()
       props.putAll(logManager.defaultConfig.originals)
-      props.putAll(topicConfig)
+      topicConfig.asScala.foreach { case (key, value) =>
+        if (key != configNameToExclude) props.put(key, value)
+      }
       val logConfig = LogConfig(props)
-      for (log <- logsByTopic(topic))
-        log.config = logConfig
+      logs.foreach(_.config = logConfig)
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/01aeea7c/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala 
b/core/src/main/scala/kafka/server/KafkaApis.scala
index bd02630..2a289b4 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -368,12 +368,10 @@ class KafkaApis(val requestChannel: RequestChannel,
           val respHeader = new ResponseHeader(request.header.correlationId)
           val respBody = request.header.apiVersion match {
             case 0 => new ProduceResponse(mergedResponseStatus.asJava)
-            case 1 => new ProduceResponse(mergedResponseStatus.asJava, 
delayTimeMs, 1)
-            case 2 => new ProduceResponse(mergedResponseStatus.asJava, 
delayTimeMs, 2)
+            case version@ (1 | 2) => new 
ProduceResponse(mergedResponseStatus.asJava, delayTimeMs, version)
             // This case shouldn't happen unless a new version of 
ProducerRequest is added without
             // updating this part of the code to handle it properly.
-            case _ => throw new IllegalArgumentException("Version %d of 
ProducerRequest is not handled. Code must be updated."
-              .format(request.header.apiVersion))
+            case version => throw new IllegalArgumentException(s"Version 
`$version` of ProduceRequest is not handled. Code must be updated.")
           }
 
           requestChannel.sendResponse(new RequestChannel.Response(request, new 
ResponseSend(request.connectionId, respHeader, respBody)))
@@ -424,52 +422,51 @@ class KafkaApis(val requestChannel: RequestChannel,
       case (topicAndPartition, _) => authorize(request.session, Read, new 
Resource(Topic, topicAndPartition.topic))
     }
 
-    val unauthorizedResponseStatus = unauthorizedRequestInfo.mapValues(_ => 
FetchResponsePartitionData(Errors.TOPIC_AUTHORIZATION_FAILED.code, -1, 
MessageSet.Empty))
+    val unauthorizedPartitionData = unauthorizedRequestInfo.mapValues { _ =>
+      FetchResponsePartitionData(Errors.TOPIC_AUTHORIZATION_FAILED.code, -1, 
MessageSet.Empty)
+    }
 
     // the callback for sending a fetch response
     def sendResponseCallback(responsePartitionData: Map[TopicAndPartition, 
FetchResponsePartitionData]) {
 
-      val convertedResponseStatus =
+      val convertedPartitionData =
         // Need to down-convert message when consumer only takes magic value 0.
         if (fetchRequest.versionId <= 1) {
-          responsePartitionData.map({ case (tp, data) =>
-            tp -> {
-              // We only do down-conversion when:
-              // 1. The message format version configured for the topic is 
using magic value > 0, and
-              // 2. The message set contains message whose magic > 0
-              // This is to reduce the message format conversion as much as 
possible. The conversion will only occur
-              // when new message format is used for the topic and we see an 
old request.
-              // Please notice that if the message format is changed from a 
higher version back to lower version this
-              // test might break because some messages in new message format 
can be delivered to consumers before 0.10.0.0
-              // without format down conversion.
-              if (replicaManager.getMessageFormatVersion(tp).exists(_ > 
Message.MagicValue_V0) &&
-                  
!data.messages.magicValueInAllWrapperMessages(Message.MagicValue_V0)) {
-                trace("Down converting message to V0 for fetch request from " 
+ fetchRequest.clientId)
-                new FetchResponsePartitionData(data.error, data.hw, 
data.messages.asInstanceOf[FileMessageSet].toMessageFormat(Message.MagicValue_V0))
-              } else
-                data
-            }
-          })
-        } else
-          responsePartitionData
+          responsePartitionData.map { case (tp, data) =>
+
+            // We only do down-conversion when:
+            // 1. The message format version configured for the topic is using 
magic value > 0, and
+            // 2. The message set contains message whose magic > 0
+            // This is to reduce the message format conversion as much as 
possible. The conversion will only occur
+            // when new message format is used for the topic and we see an old 
request.
+            // Please note that if the message format is changed from a higher 
version back to lower version this
+            // test might break because some messages in new message format 
can be delivered to consumers before 0.10.0.0
+            // without format down conversion.
+            val convertedData = if 
(replicaManager.getMessageFormatVersion(tp).exists(_ > Message.MagicValue_V0) &&
+              
!data.messages.isMagicValueInAllWrapperMessages(Message.MagicValue_V0)) {
+              trace(s"Down converting message to V0 for fetch request from 
${fetchRequest.clientId}")
+              new FetchResponsePartitionData(data.error, data.hw, 
data.messages.asInstanceOf[FileMessageSet].toMessageFormat(Message.MagicValue_V0))
+            } else data
+
+            tp -> convertedData
+          }
+        } else responsePartitionData
 
-      val mergedResponseStatus = convertedResponseStatus ++ 
unauthorizedResponseStatus
+      val mergedPartitionData = convertedPartitionData ++ 
unauthorizedPartitionData
 
-      mergedResponseStatus.foreach { case (topicAndPartition, data) =>
-        if (data.error != Errors.NONE.code) {
-          debug("Fetch request with correlation id %d from client %s on 
partition %s failed due to %s"
-            .format(fetchRequest.correlationId, fetchRequest.clientId,
-            topicAndPartition, Errors.forCode(data.error).exceptionName))
-        }
+      mergedPartitionData.foreach { case (topicAndPartition, data) =>
+        if (data.error != Errors.NONE.code)
+          debug(s"Fetch request with correlation id 
${fetchRequest.correlationId} from client ${fetchRequest.clientId} " +
+            s"on partition $topicAndPartition failed due to 
${Errors.forCode(data.error).exceptionName}")
         // record the bytes out metrics only when the response is being sent
         
BrokerTopicStats.getBrokerTopicStats(topicAndPartition.topic).bytesOutRate.mark(data.messages.sizeInBytes)
         
BrokerTopicStats.getBrokerAllTopicsStats().bytesOutRate.mark(data.messages.sizeInBytes)
       }
 
       def fetchResponseCallback(delayTimeMs: Int) {
-          trace(s"Sending fetch response to ${fetchRequest.clientId} with 
${convertedResponseStatus.values.map(_.messages.sizeInBytes).sum}" +
-            s" bytes")
-        val response = FetchResponse(fetchRequest.correlationId, 
mergedResponseStatus, fetchRequest.versionId, delayTimeMs)
+        trace(s"Sending fetch response to client ${fetchRequest.clientId} of " 
+
+          s"${convertedPartitionData.values.map(_.messages.sizeInBytes).sum} 
bytes")
+        val response = FetchResponse(fetchRequest.correlationId, 
mergedPartitionData, fetchRequest.versionId, delayTimeMs)
         requestChannel.sendResponse(new RequestChannel.Response(request, new 
FetchResponseSend(request.connectionId, response)))
       }
 
@@ -482,7 +479,7 @@ class KafkaApis(val requestChannel: RequestChannel,
         fetchResponseCallback(0)
       } else {
         
quotaManagers(ApiKeys.FETCH.id).recordAndMaybeThrottle(fetchRequest.clientId,
-                                                               
FetchResponse.responseSize(responsePartitionData.groupBy(_._1.topic),
+                                                               
FetchResponse.responseSize(mergedPartitionData.groupBy(_._1.topic),
                                                                                
           fetchRequest.versionId),
                                                                
fetchResponseCallback)
       }

Reply via email to