Repository: kafka
Updated Branches:
  refs/heads/trunk 1a67739c2 -> d09043637


KAFKA-4298; Ensure compressed message sets are not converted when log cleaning

Author: Jason Gustafson <ja...@confluent.io>

Reviewers: Ismael Juma <ism...@juma.me.uk>, Jun Rao <jun...@gmail.com>

Closes #2019 from hachikuji/KAFKA-4298


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/d0904363
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/d0904363
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/d0904363

Branch: refs/heads/trunk
Commit: d09043637d5ea4094e3ee9808f29d037e8afaba6
Parents: 1a67739
Author: Jason Gustafson <ja...@confluent.io>
Authored: Thu Oct 13 21:08:28 2016 -0700
Committer: Jason Gustafson <ja...@confluent.io>
Committed: Thu Oct 13 21:08:28 2016 -0700

----------------------------------------------------------------------
 core/src/main/scala/kafka/log/LogCleaner.scala  |  82 ++++++++------
 .../kafka/message/ByteBufferMessageSet.scala    |  28 +++--
 .../test/scala/unit/kafka/log/CleanerTest.scala | 110 ++++++++++++++++++-
 .../kafka/log/LogCleanerIntegrationTest.scala   |  56 +++++++++-
 .../message/ByteBufferMessageSetTest.scala      |  52 +++++++++
 5 files changed, 280 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/d0904363/core/src/main/scala/kafka/log/LogCleaner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala 
b/core/src/main/scala/kafka/log/LogCleaner.scala
index 3d9a20d..219957f 100644
--- a/core/src/main/scala/kafka/log/LogCleaner.scala
+++ b/core/src/main/scala/kafka/log/LogCleaner.scala
@@ -401,7 +401,7 @@ private[log] class Cleaner(val id: Int,
         val retainDeletes = old.largestTimestamp > deleteHorizonMs
         info("Cleaning segment %s in log %s (largest timestamp %s) into %s, %s 
deletes."
             .format(old.baseOffset, log.name, new Date(old.largestTimestamp), 
cleaned.baseOffset, if(retainDeletes) "retaining" else "discarding"))
-        cleanInto(log.topicAndPartition, old, cleaned, map, retainDeletes, 
log.config.messageFormatVersion.messageFormatVersion, log.config.maxMessageSize)
+        cleanInto(log.topicAndPartition, old, cleaned, map, retainDeletes, 
log.config.maxMessageSize)
       }
 
       // trim excess index
@@ -439,7 +439,6 @@ private[log] class Cleaner(val id: Int,
    * @param dest The cleaned log segment
    * @param map The key=>offset mapping
    * @param retainDeletes Should delete tombstones be retained while cleaning 
this segment
-   * @param messageFormatVersion The message format version to use after 
compaction
    * @param maxLogMessageSize The maximum message size of the corresponding 
topic
    */
   private[log] def cleanInto(topicAndPartition: TopicAndPartition,
@@ -447,7 +446,6 @@ private[log] class Cleaner(val id: Int,
                              dest: LogSegment,
                              map: OffsetMap,
                              retainDeletes: Boolean,
-                             messageFormatVersion: Byte,
                              maxLogMessageSize: Int) {
     var position = 0
     while (position < source.log.sizeInBytes) {
@@ -461,41 +459,54 @@ private[log] class Cleaner(val id: Int,
       throttler.maybeThrottle(messages.sizeInBytes)
       // check each message to see if it is to be retained
       var messagesRead = 0
-      for (entry <- messages.shallowIterator) {
-        val size = MessageSet.entrySize(entry.message)
+      for (shallowMessageAndOffset <- messages.shallowIterator) {
+        val shallowMessage = shallowMessageAndOffset.message
+        val shallowOffset = shallowMessageAndOffset.offset
+        val size = MessageSet.entrySize(shallowMessageAndOffset.message)
+
         stats.readMessage(size)
-        if (entry.message.compressionCodec == NoCompressionCodec) {
-          if (shouldRetainMessage(source, map, retainDeletes, entry)) {
-            ByteBufferMessageSet.writeMessage(writeBuffer, entry.message, 
entry.offset)
+        if (shallowMessage.compressionCodec == NoCompressionCodec) {
+          if (shouldRetainMessage(source, map, retainDeletes, 
shallowMessageAndOffset)) {
+            ByteBufferMessageSet.writeMessage(writeBuffer, shallowMessage, 
shallowOffset)
             stats.recopyMessage(size)
-            if (entry.message.timestamp > maxTimestamp) {
-              maxTimestamp = entry.message.timestamp
-              offsetOfMaxTimestamp = entry.offset
+            if (shallowMessage.timestamp > maxTimestamp) {
+              maxTimestamp = shallowMessage.timestamp
+              offsetOfMaxTimestamp = shallowOffset
             }
           }
           messagesRead += 1
         } else {
-          // We use the absolute offset to decide whether to retain the 
message or not. This is handled by the
-          // deep iterator.
-          val messages = ByteBufferMessageSet.deepIterator(entry)
+          // We use the absolute offset to decide whether to retain the 
message or not (this is handled by the
+          // deep iterator). Because of KAFKA-4298, we have to allow for the 
possibility that a previous version
+          // corrupted the log by writing a compressed message set with a 
wrapper magic value not matching the magic
+          // of the inner messages. This will be fixed as we recopy the 
messages to the destination segment.
+
           var writeOriginalMessageSet = true
           val retainedMessages = new mutable.ArrayBuffer[MessageAndOffset]
-          messages.foreach { messageAndOffset =>
+          val shallowMagic = shallowMessage.magic
+
+          for (deepMessageAndOffset <- 
ByteBufferMessageSet.deepIterator(shallowMessageAndOffset)) {
             messagesRead += 1
-            if (shouldRetainMessage(source, map, retainDeletes, 
messageAndOffset)) {
-              retainedMessages += messageAndOffset
+            if (shouldRetainMessage(source, map, retainDeletes, 
deepMessageAndOffset)) {
+              // Check for log corruption due to KAFKA-4298. If we find it, 
make sure that we overwrite
+              // the corrupted entry with correct data.
+              if (shallowMagic != deepMessageAndOffset.message.magic)
+                writeOriginalMessageSet = false
+
+              retainedMessages += deepMessageAndOffset
               // We need the max timestamp and last offset for time index
-              if (messageAndOffset.message.timestamp > maxTimestamp)
-                maxTimestamp = messageAndOffset.message.timestamp
+              if (deepMessageAndOffset.message.timestamp > maxTimestamp)
+                maxTimestamp = deepMessageAndOffset.message.timestamp
+            } else {
+              writeOriginalMessageSet = false
             }
-            else writeOriginalMessageSet = false
           }
           offsetOfMaxTimestamp = if (retainedMessages.nonEmpty) 
retainedMessages.last.offset else -1L
           // There are no messages compacted out and no message format 
conversion, write the original message set back
           if (writeOriginalMessageSet)
-            ByteBufferMessageSet.writeMessage(writeBuffer, entry.message, 
entry.offset)
+            ByteBufferMessageSet.writeMessage(writeBuffer, shallowMessage, 
shallowOffset)
           else
-            compressMessages(writeBuffer, entry.message.compressionCodec, 
messageFormatVersion, retainedMessages)
+            compressMessages(writeBuffer, shallowMessage.compressionCodec, 
retainedMessages)
         }
       }
 
@@ -518,29 +529,34 @@ private[log] class Cleaner(val id: Int,
 
   private def compressMessages(buffer: ByteBuffer,
                                compressionCodec: CompressionCodec,
-                               messageFormatVersion: Byte,
                                messageAndOffsets: Seq[MessageAndOffset]) {
     require(compressionCodec != NoCompressionCodec, s"compressionCodec must 
not be $NoCompressionCodec")
     if (messageAndOffsets.nonEmpty) {
       val messages = messageAndOffsets.map(_.message)
       val magicAndTimestamp = MessageSet.magicAndLargestTimestamp(messages)
+
+      // ensure that we use the magic from the first message in the set when 
writing the wrapper
+      // message in order to fix message sets corrupted by KAFKA-4298
+      val magic = magicAndTimestamp.magic
+
       val firstMessageOffset = messageAndOffsets.head
       val firstAbsoluteOffset = firstMessageOffset.offset
       var offset = -1L
       val timestampType = firstMessageOffset.message.timestampType
       val messageWriter = new 
MessageWriter(math.min(math.max(MessageSet.messageSetSize(messages) / 2, 1024), 
1 << 16))
-      messageWriter.write(codec = compressionCodec, timestamp = 
magicAndTimestamp.timestamp, timestampType = timestampType, magicValue = 
messageFormatVersion) { outputStream =>
-        val output = new DataOutputStream(CompressionFactory(compressionCodec, 
messageFormatVersion, outputStream))
+      messageWriter.write(codec = compressionCodec, timestamp = 
magicAndTimestamp.timestamp, timestampType = timestampType, magicValue = magic) 
{ outputStream =>
+        val output = new DataOutputStream(CompressionFactory(compressionCodec, 
magic, outputStream))
         try {
-          for (messageOffset <- messageAndOffsets) {
-            val message = messageOffset.message
-            offset = messageOffset.offset
-            if (messageFormatVersion > Message.MagicValue_V0) {
+          for (messageAndOffset <- messageAndOffsets) {
+            offset = messageAndOffset.offset
+            val innerOffset = if (magic > Message.MagicValue_V0)
               // The offset of the messages are absolute offset, compute the 
inner offset.
-              val innerOffset = messageOffset.offset - firstAbsoluteOffset
-              output.writeLong(innerOffset)
-            } else
-              output.writeLong(offset)
+              messageAndOffset.offset - firstAbsoluteOffset
+            else
+              offset
+
+            val message = messageAndOffset.message
+            output.writeLong(innerOffset)
             output.writeInt(message.size)
             output.write(message.buffer.array, message.buffer.arrayOffset, 
message.buffer.limit)
           }

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0904363/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala 
b/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
index aadda86..1ef91b9 100644
--- a/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
+++ b/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
@@ -78,7 +78,7 @@ object ByteBufferMessageSet {
   }
 
   /** Deep iterator that decompresses the message sets and adjusts timestamp 
and offset if needed. */
-  def deepIterator(wrapperMessageAndOffset: MessageAndOffset): 
Iterator[MessageAndOffset] = {
+  def deepIterator(wrapperMessageAndOffset: MessageAndOffset, 
ensureMatchingMagic: Boolean = false): Iterator[MessageAndOffset] = {
 
     import Message._
 
@@ -138,12 +138,15 @@ object ByteBufferMessageSet {
         // Override the timestamp if necessary
         val newMessage = new Message(buffer, wrapperMessageTimestampOpt, 
wrapperMessageTimestampTypeOpt)
 
-        // Inner message and wrapper message must have same magic value
-        if (newMessage.magic != wrapperMessage.magic)
-          throw new IllegalStateException(s"Compressed message has magic value 
${wrapperMessage.magic} " +
+        // Due to KAFKA-4298, it is possible for the inner and outer magic 
values to differ. We ignore
+        // this and depend on the outer message in order to decide how to 
compute the respective offsets
+        // for the inner messages
+        if (ensureMatchingMagic && newMessage.magic != wrapperMessage.magic)
+          throw new InvalidMessageException(s"Compressed message has magic 
value ${wrapperMessage.magic} " +
             s"but inner message has magic value ${newMessage.magic}")
+
         lastInnerOffset = innerOffset
-        new MessageAndOffset(newMessage, innerOffset)
+        MessageAndOffset(newMessage, innerOffset)
       }
 
       override def makeNext(): MessageAndOffset = {
@@ -153,7 +156,7 @@ object ByteBufferMessageSet {
             if (wrapperMessage.magic > MagicValue_V0) {
               val relativeOffset = offset - lastInnerOffset
               val absoluteOffset = wrapperMessageOffset + relativeOffset
-              new MessageAndOffset(message, absoluteOffset)
+              MessageAndOffset(message, absoluteOffset)
             } else {
               nextMessage
             }
@@ -328,10 +331,10 @@ class ByteBufferMessageSet(val buffer: ByteBuffer) 
extends MessageSet with Loggi
   override def iterator: Iterator[MessageAndOffset] = internalIterator()
 
   /** iterator over compressed messages without decompressing */
-  def shallowIterator: Iterator[MessageAndOffset] = internalIterator(true)
+  def shallowIterator: Iterator[MessageAndOffset] = internalIterator(isShallow 
= true)
 
   /** When flag isShallow is set to be true, we do a shallow iteration: just 
traverse the first level of messages. **/
-  private def internalIterator(isShallow: Boolean = false): 
Iterator[MessageAndOffset] = {
+  private def internalIterator(isShallow: Boolean = false, 
ensureMatchingMagic: Boolean = false): Iterator[MessageAndOffset] = {
     new IteratorTemplate[MessageAndOffset] {
       var topIter = buffer.slice()
       var innerIter: Iterator[MessageAndOffset] = null
@@ -357,14 +360,14 @@ class ByteBufferMessageSet(val buffer: ByteBuffer) 
extends MessageSet with Loggi
         topIter.position(topIter.position + size)
         val newMessage = new Message(message)
         if(isShallow) {
-          new MessageAndOffset(newMessage, offset)
+          MessageAndOffset(newMessage, offset)
         } else {
           newMessage.compressionCodec match {
             case NoCompressionCodec =>
               innerIter = null
-              new MessageAndOffset(newMessage, offset)
+              MessageAndOffset(newMessage, offset)
             case _ =>
-              innerIter = ByteBufferMessageSet.deepIterator(new 
MessageAndOffset(newMessage, offset))
+              innerIter = 
ByteBufferMessageSet.deepIterator(MessageAndOffset(newMessage, offset), 
ensureMatchingMagic)
               if(!innerIter.hasNext)
                 innerIter = null
               makeNext()
@@ -435,7 +438,8 @@ class ByteBufferMessageSet(val buffer: ByteBuffer) extends 
MessageSet with Loggi
       var offsetOfMaxTimestamp = -1L
       val expectedInnerOffset = new LongRef(0)
       val validatedMessages = new mutable.ArrayBuffer[Message]
-      this.internalIterator(isShallow = false).foreach { messageAndOffset =>
+
+      this.internalIterator(isShallow = false, ensureMatchingMagic = 
true).foreach { messageAndOffset =>
         val message = messageAndOffset.message
         validateMessageKey(message, compactedTopic)
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0904363/core/src/test/scala/unit/kafka/log/CleanerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/CleanerTest.scala 
b/core/src/test/scala/unit/kafka/log/CleanerTest.scala
index f4458a0..536f10d 100755
--- a/core/src/test/scala/unit/kafka/log/CleanerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/CleanerTest.scala
@@ -17,7 +17,7 @@
 
 package kafka.log
 
-import java.io.File
+import java.io.{DataOutputStream, File}
 import java.nio._
 import java.nio.file.Paths
 import java.util.Properties
@@ -25,6 +25,7 @@ import java.util.Properties
 import kafka.common._
 import kafka.message._
 import kafka.utils._
+import org.apache.kafka.common.record.{MemoryRecords, TimestampType}
 import org.apache.kafka.common.utils.Utils
 import org.junit.Assert._
 import org.junit.{After, Test}
@@ -140,7 +141,7 @@ class CleanerTest extends JUnitSuite {
   @Test
   def testPartialSegmentClean(): Unit = {
     // because loadFactor is 0.75, this means we can fit 2 messages in the map
-    var cleaner = makeCleaner(2)
+    val cleaner = makeCleaner(2)
     val logProps = new Properties()
     logProps.put(LogConfig.SegmentBytesProp, 1024: java.lang.Integer)
 
@@ -610,11 +611,116 @@ class CleanerTest extends JUnitSuite {
     assertEquals(-1, map.get(key(4)))
   }
 
+  /**
+   * This test verifies that messages corrupted by KAFKA-4298 are fixed by the 
cleaner
+   */
+  @Test
+  def testCleanCorruptMessageSet() {
+    val codec = SnappyCompressionCodec
+
+    val logProps = new Properties()
+    logProps.put(LogConfig.CompressionTypeProp, codec.name)
+    val logConfig = LogConfig(logProps)
+
+    val log = makeLog(config = logConfig)
+    val cleaner = makeCleaner(10)
+
+    // messages are constructed so that the payload matches the expecting 
offset to
+    // make offset validation easier after cleaning
+
+    // one compressed log entry with duplicates
+    val dupSetKeys = (0 until 2) ++ (0 until 2)
+    val dupSetOffset = 25
+    val dupSet = dupSetKeys zip (dupSetOffset until dupSetOffset + 
dupSetKeys.size)
+
+    // and one without (should still be fixed by the cleaner)
+    val noDupSetKeys = 3 until 5
+    val noDupSetOffset = 50
+    val noDupSet = noDupSetKeys zip (noDupSetOffset until noDupSetOffset + 
noDupSetKeys.size)
+
+    log.append(invalidCleanedMessage(dupSetOffset, dupSet, codec), 
assignOffsets = false)
+    log.append(invalidCleanedMessage(noDupSetOffset, noDupSet, codec), 
assignOffsets = false)
+
+    log.roll()
+
+    cleaner.clean(LogToClean(TopicAndPartition("test", 0), log, 0, 
log.activeSegment.baseOffset))
+
+    for (segment <- log.logSegments; shallowMessage <- segment.log.iterator; 
deepMessage <- ByteBufferMessageSet.deepIterator(shallowMessage)) {
+      assertEquals(shallowMessage.message.magic, deepMessage.message.magic)
+      val value = TestUtils.readString(deepMessage.message.payload).toLong
+      assertEquals(deepMessage.offset, value)
+    }
+  }
+
+  /**
+   * Verify that the client can handle corrupted messages. Located here for 
now since the client
+   * does not support writing messages with the old magic.
+   */
+  @Test
+  def testClientHandlingOfCorruptMessageSet(): Unit = {
+    import JavaConverters._
+
+    val keys = 1 until 10
+    val offset = 50
+    val set = keys zip (offset until offset + keys.size)
+
+    val corruptedMessage = invalidCleanedMessage(offset, set)
+    val records = MemoryRecords.readableRecords(corruptedMessage.buffer)
+
+    for (logEntry <- records.iterator.asScala) {
+      val offset = logEntry.offset
+      val value = TestUtils.readString(logEntry.record.value).toLong
+      assertEquals(offset, value)
+    }
+  }
+
   private def writeToLog(log: Log, keysAndValues: Iterable[(Int, Int)], 
offsetSeq: Iterable[Long]): Iterable[Long] = {
     for(((key, value), offset) <- keysAndValues.zip(offsetSeq))
       yield log.append(messageWithOffset(key, value, offset), assignOffsets = 
false).firstOffset
   }
 
+  private def invalidCleanedMessage(initialOffset: Long,
+                                    keysAndValues: Iterable[(Int, Int)],
+                                    codec: CompressionCodec = 
SnappyCompressionCodec): ByteBufferMessageSet = {
+    // this function replicates the old versions of the cleaner which under 
some circumstances
+    // would write invalid compressed message sets with the outer magic set to 
1 and the inner
+    // magic set to 0
+
+    val messages = keysAndValues.map(kv =>
+      new Message(key = kv._1.toString.getBytes,
+        bytes = kv._2.toString.getBytes,
+        timestamp = Message.NoTimestamp,
+        magicValue = Message.MagicValue_V0))
+
+    val messageWriter = new 
MessageWriter(math.min(math.max(MessageSet.messageSetSize(messages) / 2, 1024), 
1 << 16))
+    var lastOffset = initialOffset
+
+    messageWriter.write(
+      codec = codec,
+      timestamp = Message.NoTimestamp,
+      timestampType = TimestampType.CREATE_TIME,
+      magicValue = Message.MagicValue_V1) { outputStream =>
+
+      val output = new DataOutputStream(CompressionFactory(codec, 
Message.MagicValue_V1, outputStream))
+      try {
+        for (message <- messages) {
+          val innerOffset = lastOffset - initialOffset
+          output.writeLong(innerOffset)
+          output.writeInt(message.size)
+          output.write(message.buffer.array, message.buffer.arrayOffset, 
message.buffer.limit)
+          lastOffset += 1
+        }
+      } finally {
+        output.close()
+      }
+    }
+    val buffer = ByteBuffer.allocate(messageWriter.size + 
MessageSet.LogOverhead)
+    ByteBufferMessageSet.writeMessage(buffer, messageWriter, lastOffset - 1)
+    buffer.rewind()
+
+    new ByteBufferMessageSet(buffer)
+  }
+
   private def messageWithOffset(key: Int, value: Int, offset: Long) =
     new ByteBufferMessageSet(NoCompressionCodec, Seq(offset),
                              new Message(key = key.toString.getBytes,

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0904363/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala 
b/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
index 9e4951a..40030cb 100755
--- a/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
@@ -191,6 +191,43 @@ class LogCleanerIntegrationTest(compressionCodec: String) {
     checkLogAfterAppendingDups(log, startSize, appends2)
   }
 
+  @Test
+  def testCleaningNestedMessagesWithMultipleVersions(): Unit = {
+    val maxMessageSize = 192
+    cleaner = makeCleaner(parts = 3, maxMessageSize = maxMessageSize)
+
+    val log = cleaner.logs.get(topics(0))
+    val props = logConfigProperties(maxMessageSize = maxMessageSize)
+    props.put(LogConfig.MessageFormatVersionProp, KAFKA_0_9_0.version)
+    log.config = new LogConfig(props)
+
+    // with compression enabled, these messages will be written as a single 
message containing
+    // all of the individual messages
+    var appendsV0 = writeDupsSingleMessageSet(numKeys = 2, numDups = 3, log = 
log, codec = codec, magicValue = Message.MagicValue_V0)
+    appendsV0 ++= writeDupsSingleMessageSet(numKeys = 2, startKey = 3, numDups 
= 2, log = log, codec = codec, magicValue = Message.MagicValue_V0)
+
+    props.put(LogConfig.MessageFormatVersionProp, KAFKA_0_10_0_IV1.version)
+    log.config = new LogConfig(props)
+
+    var appendsV1 = writeDupsSingleMessageSet(startKey = 4, numKeys = 2, 
numDups = 2, log = log, codec = codec, magicValue = Message.MagicValue_V1)
+    appendsV1 ++= writeDupsSingleMessageSet(startKey = 4, numKeys = 2, numDups 
= 2, log = log, codec = codec, magicValue = Message.MagicValue_V1)
+    appendsV1 ++= writeDupsSingleMessageSet(startKey = 6, numKeys = 2, numDups 
= 2, log = log, codec = codec, magicValue = Message.MagicValue_V1)
+
+    val appends = appendsV0 ++ appendsV1
+
+    val startSize = log.size
+    cleaner.startup()
+
+    val firstDirty = log.activeSegment.baseOffset
+    assertTrue(firstDirty > appendsV0.size) // ensure we clean data from V0 
and V1
+
+    checkLastCleaned("log", 0, firstDirty)
+    val compactedSize = log.logSegments.map(_.size).sum
+    assertTrue(s"log should have been compacted: startSize=$startSize 
compactedSize=$compactedSize", startSize > compactedSize)
+
+    checkLogAfterAppendingDups(log, startSize, appends)
+  }
+
   private def checkLastCleaned(topic: String, partitionId: Int, firstDirty: 
Long) {
     // wait until cleaning up to base_offset, note that cleaning happens only 
when "log dirty ratio" is higher than
     // LogConfig.MinCleanableDirtyRatioProp
@@ -230,7 +267,24 @@ class LogCleanerIntegrationTest(compressionCodec: String) {
       (key, payload)
     }
   }
-    
+
+  private def writeDupsSingleMessageSet(numKeys: Int, numDups: Int, log: Log, 
codec: CompressionCodec,
+                                        startKey: Int = 0, magicValue: Byte = 
Message.CurrentMagicValue): Seq[(Int, String)] = {
+    val kvs = for (_ <- 0 until numDups; key <- startKey until (startKey + 
numKeys)) yield {
+      val payload = counter.toString
+      counter += 1
+      (key, payload)
+    }
+
+    val messages = kvs.map { case (key, payload) =>
+      new Message(payload.toString.getBytes, key.toString.getBytes, 
Message.NoTimestamp, magicValue)
+    }
+
+    val messageSet = new ByteBufferMessageSet(compressionCodec = codec, 
messages: _*)
+    log.append(messageSet, assignOffsets = true)
+    kvs
+  }
+
   @After
   def tearDown(): Unit = {
     cleaner.shutdown()

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0904363/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala 
b/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala
index 39eb84c..18a023c 100644
--- a/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala
+++ b/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala
@@ -17,6 +17,7 @@
 
 package kafka.message
 
+import java.io.DataOutputStream
 import java.nio._
 
 import kafka.common.LongRef
@@ -364,6 +365,19 @@ class ByteBufferMessageSetTest extends 
BaseMessageSetTestCases {
     checkOffsets(compressedMessagesWithOffset, offset)
   }
 
+  @Test(expected = classOf[InvalidMessageException])
+  def testInvalidInnerMagicVersion(): Unit = {
+    val offset = 1234567
+    val messages = messageSetWithInvalidInnerMagic(SnappyCompressionCodec, 
offset)
+    messages.validateMessagesAndAssignOffsets(offsetCounter = new 
LongRef(offset),
+      now = System.currentTimeMillis(),
+      sourceCodec = SnappyCompressionCodec,
+      targetCodec = SnappyCompressionCodec,
+      messageTimestampType = TimestampType.CREATE_TIME,
+      messageTimestampDiffMaxMs = 5000L).validatedMessages
+  }
+
+
   @Test
   def testOffsetAssignmentAfterMessageFormatConversion() {
     // Check up conversion
@@ -460,4 +474,42 @@ class ByteBufferMessageSetTest extends 
BaseMessageSetTestCases {
         new Message("beautiful".getBytes, timestamp = timestamp, magicValue = 
Message.MagicValue_V1))
     }
   }
+
+  private def messageSetWithInvalidInnerMagic(codec: CompressionCodec,
+                                              initialOffset: Long): 
ByteBufferMessageSet = {
+    val messages = (0 until 20).map(id =>
+      new Message(key = id.toString.getBytes,
+        bytes = id.toString.getBytes,
+        timestamp = Message.NoTimestamp,
+        magicValue = Message.MagicValue_V0))
+
+    val messageWriter = new 
MessageWriter(math.min(math.max(MessageSet.messageSetSize(messages) / 2, 1024), 
1 << 16))
+    var lastOffset = initialOffset
+
+    messageWriter.write(
+      codec = codec,
+      timestamp = System.currentTimeMillis(),
+      timestampType = TimestampType.CREATE_TIME,
+      magicValue = Message.MagicValue_V1) { outputStream =>
+
+      val output = new DataOutputStream(CompressionFactory(codec, 
Message.MagicValue_V1, outputStream))
+      try {
+        for (message <- messages) {
+          val innerOffset = lastOffset - initialOffset
+          output.writeLong(innerOffset)
+          output.writeInt(message.size)
+          output.write(message.buffer.array, message.buffer.arrayOffset, 
message.buffer.limit)
+          lastOffset += 1
+        }
+      } finally {
+        output.close()
+      }
+    }
+    val buffer = ByteBuffer.allocate(messageWriter.size + 
MessageSet.LogOverhead)
+    ByteBufferMessageSet.writeMessage(buffer, messageWriter, lastOffset - 1)
+    buffer.rewind()
+
+    new ByteBufferMessageSet(buffer)
+  }
+
 }

Reply via email to