http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1e5b9/core/src/test/scala/unit/kafka/log/LogTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala index d18719a..fcf9c89 100755 --- a/core/src/test/scala/unit/kafka/log/LogTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogTest.scala @@ -22,15 +22,16 @@ import java.util.Properties import org.apache.kafka.common.errors.{CorruptRecordException, OffsetOutOfRangeException, RecordBatchTooLargeException, RecordTooLargeException} import kafka.api.ApiVersion -import kafka.common.LongRef import org.junit.Assert._ import org.scalatest.junit.JUnitSuite import org.junit.{After, Before, Test} -import kafka.message._ import kafka.utils._ import kafka.server.KafkaConfig +import org.apache.kafka.common.record._ import org.apache.kafka.common.utils.Utils +import scala.collection.JavaConverters._ + class LogTest extends JUnitSuite { val tmpDir = TestUtils.tempDir() @@ -63,7 +64,7 @@ class LogTest extends JUnitSuite { */ @Test def testTimeBasedLogRoll() { - val set = TestUtils.singleMessageSet("test".getBytes) + val set = TestUtils.singletonRecords("test".getBytes) val logProps = new Properties() logProps.put(LogConfig.SegmentMsProp, (1 * 60 * 60L): java.lang.Long) @@ -91,7 +92,7 @@ class LogTest extends JUnitSuite { // Append a message with timestamp to a segment whose first messgae do not have a timestamp. val setWithTimestamp = - TestUtils.singleMessageSet(payload = "test".getBytes, timestamp = time.milliseconds + log.config.segmentMs + 1) + TestUtils.singletonRecords(value = "test".getBytes, timestamp = time.milliseconds + log.config.segmentMs + 1) log.append(setWithTimestamp) assertEquals("Segment should not have been rolled out because the log rolling should be based on wall clock.", 4, log.numberOfSegments) @@ -105,14 +106,14 @@ class LogTest extends JUnitSuite { log.append(setWithTimestamp) assertEquals("Log should not roll because the roll should depend on timestamp of the first message.", 5, log.numberOfSegments) - val setWithExpiredTimestamp = TestUtils.singleMessageSet(payload = "test".getBytes, timestamp = time.milliseconds) + val setWithExpiredTimestamp = TestUtils.singletonRecords(value = "test".getBytes, timestamp = time.milliseconds) log.append(setWithExpiredTimestamp) assertEquals("Log should roll because the timestamp in the message should make the log segment expire.", 6, log.numberOfSegments) val numSegments = log.numberOfSegments time.sleep(log.config.segmentMs + 1) - log.append(new ByteBufferMessageSet()) - assertEquals("Appending an empty message set should not roll log even if succient time has passed.", numSegments, log.numberOfSegments) + log.append(MemoryRecords.withLogEntries()) + assertEquals("Appending an empty message set should not roll log even if sufficient time has passed.", numSegments, log.numberOfSegments) } /** @@ -121,7 +122,7 @@ class LogTest extends JUnitSuite { */ @Test def testTimeBasedLogRollJitter() { - val set = TestUtils.singleMessageSet("test".getBytes) + val set = TestUtils.singletonRecords("test".getBytes) val maxJitter = 20 * 60L val logProps = new Properties() @@ -149,7 +150,7 @@ class LogTest extends JUnitSuite { */ @Test def testSizeBasedLogRoll() { - val set = TestUtils.singleMessageSet("test".getBytes) + val set = TestUtils.singletonRecords("test".getBytes) val setSize = set.sizeInBytes val msgPerSeg = 10 val segmentSize = msgPerSeg * (setSize - 1) // each segment will be 10 messages @@ -176,7 +177,7 @@ class LogTest extends JUnitSuite { def testLoadEmptyLog() { createEmptyLogs(logDir, 0) val log = new Log(logDir, logConfig, recoveryPoint = 0L, time.scheduler, time = time) - log.append(TestUtils.singleMessageSet("test".getBytes)) + log.append(TestUtils.singletonRecords("test".getBytes)) } /** @@ -189,16 +190,17 @@ class LogTest extends JUnitSuite { // We use need to use magic value 1 here because the test is message size sensitive. logProps.put(LogConfig.MessageFormatVersionProp, ApiVersion.latestVersion.toString) val log = new Log(logDir, LogConfig(logProps), recoveryPoint = 0L, time.scheduler, time = time) - val messages = (0 until 100 by 2).map(id => new Message(id.toString.getBytes)).toArray + val records = (0 until 100 by 2).map(id => Record.create(id.toString.getBytes)).toArray + + for(i <- records.indices) + log.append(MemoryRecords.withRecords(records(i))) - for(i <- 0 until messages.length) - log.append(new ByteBufferMessageSet(NoCompressionCodec, messages = messages(i))) - for(i <- 0 until messages.length) { - val read = log.read(i, 100, Some(i+1)).messageSet.head + for(i <- records.indices) { + val read = log.read(i, 100, Some(i+1)).records.shallowIterator.next() assertEquals("Offset read should match order appended.", i, read.offset) - assertEquals("Message should match appended.", messages(i), read.message) + assertEquals("Message should match appended.", records(i), read.record) } - assertEquals("Reading beyond the last message returns nothing.", 0, log.read(messages.length, 100, None).messageSet.size) + assertEquals("Reading beyond the last message returns nothing.", 0, log.read(records.length, 100, None).records.shallowIterator.asScala.size) } /** @@ -211,16 +213,16 @@ class LogTest extends JUnitSuite { logProps.put(LogConfig.SegmentBytesProp, 71: java.lang.Integer) val log = new Log(logDir, LogConfig(logProps), recoveryPoint = 0L, time.scheduler, time = time) val messageIds = ((0 until 50) ++ (50 until 200 by 7)).toArray - val messages = messageIds.map(id => new Message(id.toString.getBytes)) + val records = messageIds.map(id => Record.create(id.toString.getBytes)) // now test the case that we give the offsets and use non-sequential offsets - for(i <- 0 until messages.length) - log.append(new ByteBufferMessageSet(NoCompressionCodec, new LongRef(messageIds(i)), messages = messages(i)), assignOffsets = false) + for(i <- records.indices) + log.append(MemoryRecords.withLogEntries(LogEntry.create(messageIds(i), records(i))), assignOffsets = false) for(i <- 50 until messageIds.max) { val idx = messageIds.indexWhere(_ >= i) - val read = log.read(i, 100, None).messageSet.head + val read = log.read(i, 100, None).records.shallowIterator.next() assertEquals("Offset read should match message id.", messageIds(idx), read.offset) - assertEquals("Message should match appended.", messages(idx), read.message) + assertEquals("Message should match appended.", records(idx), read.record) } } @@ -238,12 +240,12 @@ class LogTest extends JUnitSuite { // keep appending until we have two segments with only a single message in the second segment while(log.numberOfSegments == 1) - log.append(new ByteBufferMessageSet(NoCompressionCodec, messages = new Message("42".getBytes))) + log.append(MemoryRecords.withRecords(Record.create("42".getBytes))) // now manually truncate off all but one message from the first segment to create a gap in the messages log.logSegments.head.truncateTo(1) - assertEquals("A read should now return the last message in the log", log.logEndOffset - 1, log.read(1, 200, None).messageSet.head.offset) + assertEquals("A read should now return the last message in the log", log.logEndOffset - 1, log.read(1, 200, None).records.shallowIterator.next().offset) } @Test @@ -252,12 +254,11 @@ class LogTest extends JUnitSuite { logProps.put(LogConfig.SegmentBytesProp, 71: java.lang.Integer) val log = new Log(logDir, LogConfig(logProps), recoveryPoint = 0L, time.scheduler, time = time) val messageIds = ((0 until 50) ++ (50 until 200 by 7)).toArray - val messages = messageIds.map(id => new Message(id.toString.getBytes)) + val records = messageIds.map(id => Record.create(id.toString.getBytes)) // now test the case that we give the offsets and use non-sequential offsets - for (i <- 0 until messages.length) - log.append(new ByteBufferMessageSet(NoCompressionCodec, new LongRef(messageIds(i)), messages = messages(i)), - assignOffsets = false) + for (i <- records.indices) + log.append(MemoryRecords.withLogEntries(LogEntry.create(messageIds(i), records(i))), assignOffsets = false) for (i <- 50 until messageIds.max) { val idx = messageIds.indexWhere(_ >= i) @@ -265,13 +266,13 @@ class LogTest extends JUnitSuite { log.read(i, 1, minOneMessage = true), log.read(i, 100, minOneMessage = true), log.read(i, 100, Some(10000), minOneMessage = true) - ).map(_.messageSet.head) + ).map(_.records.shallowIterator.next()) reads.foreach { read => assertEquals("Offset read should match message id.", messageIds(idx), read.offset) - assertEquals("Message should match appended.", messages(idx), read.message) + assertEquals("Message should match appended.", records(idx), read.record) } - assertEquals(Seq.empty, log.read(i, 1, Some(1), minOneMessage = true).messageSet.toIndexedSeq) + assertEquals(Seq.empty, log.read(i, 1, Some(1), minOneMessage = true).records.shallowIterator.asScala.toIndexedSeq) } } @@ -282,15 +283,14 @@ class LogTest extends JUnitSuite { logProps.put(LogConfig.SegmentBytesProp, 71: java.lang.Integer) val log = new Log(logDir, LogConfig(logProps), recoveryPoint = 0L, time.scheduler, time = time) val messageIds = ((0 until 50) ++ (50 until 200 by 7)).toArray - val messages = messageIds.map(id => new Message(id.toString.getBytes)) + val records = messageIds.map(id => Record.create(id.toString.getBytes)) // now test the case that we give the offsets and use non-sequential offsets - for (i <- 0 until messages.length) - log.append(new ByteBufferMessageSet(NoCompressionCodec, new LongRef(messageIds(i)), messages = messages(i)), - assignOffsets = false) + for (i <- records.indices) + log.append(MemoryRecords.withLogEntries(LogEntry.create(messageIds(i), records(i))), assignOffsets = false) for (i <- 50 until messageIds.max) { - assertEquals(MessageSet.Empty, log.read(i, 0).messageSet) + assertEquals(MemoryRecords.EMPTY, log.read(i, 0).records) // we return an incomplete message instead of an empty one for the case below // we use this mechanism to tell consumers of the fetch request version 2 and below that the message size is @@ -298,9 +298,9 @@ class LogTest extends JUnitSuite { // in fetch request version 3, we no longer need this as we return oversized messages from the first non-empty // partition val fetchInfo = log.read(i, 1) - assertTrue(fetchInfo.firstMessageSetIncomplete) - assertTrue(fetchInfo.messageSet.isInstanceOf[FileMessageSet]) - assertEquals(1, fetchInfo.messageSet.sizeInBytes) + assertTrue(fetchInfo.firstEntryIncomplete) + assertTrue(fetchInfo.records.isInstanceOf[FileRecords]) + assertEquals(1, fetchInfo.records.sizeInBytes) } } @@ -318,9 +318,9 @@ class LogTest extends JUnitSuite { // set up replica log starting with offset 1024 and with one message (at offset 1024) logProps.put(LogConfig.SegmentBytesProp, 1024: java.lang.Integer) val log = new Log(logDir, LogConfig(logProps), recoveryPoint = 0L, time.scheduler, time = time) - log.append(new ByteBufferMessageSet(NoCompressionCodec, messages = new Message("42".getBytes))) + log.append(MemoryRecords.withRecords(Record.create("42".getBytes))) - assertEquals("Reading at the log end offset should produce 0 byte read.", 0, log.read(1025, 1000).messageSet.sizeInBytes) + assertEquals("Reading at the log end offset should produce 0 byte read.", 0, log.read(1025, 1000).records.sizeInBytes) try { log.read(0, 1000) @@ -336,7 +336,7 @@ class LogTest extends JUnitSuite { case _: OffsetOutOfRangeException => // This is good. } - assertEquals("Reading from below the specified maxOffset should produce 0 byte read.", 0, log.read(1025, 1000, Some(1024)).messageSet.sizeInBytes) + assertEquals("Reading from below the specified maxOffset should produce 0 byte read.", 0, log.read(1025, 1000, Some(1024)).records.sizeInBytes) } /** @@ -350,21 +350,22 @@ class LogTest extends JUnitSuite { logProps.put(LogConfig.SegmentBytesProp, 100: java.lang.Integer) val log = new Log(logDir, LogConfig(logProps), recoveryPoint = 0L, time.scheduler, time = time) val numMessages = 100 - val messageSets = (0 until numMessages).map(i => TestUtils.singleMessageSet(i.toString.getBytes)) + val messageSets = (0 until numMessages).map(i => TestUtils.singletonRecords(i.toString.getBytes)) messageSets.foreach(log.append(_)) log.flush /* do successive reads to ensure all our messages are there */ var offset = 0L for(i <- 0 until numMessages) { - val messages = log.read(offset, 1024*1024).messageSet - assertEquals("Offsets not equal", offset, messages.head.offset) - assertEquals("Messages not equal at offset " + offset, messageSets(i).head.message, - messages.head.message.toFormatVersion(messageSets(i).head.message.magic)) - offset = messages.head.offset + 1 + val messages = log.read(offset, 1024*1024).records.shallowIterator + val head = messages.next() + assertEquals("Offsets not equal", offset, head.offset) + assertEquals("Messages not equal at offset " + offset, messageSets(i).shallowIterator.next().record, + head.record.convert(messageSets(i).shallowIterator.next().record.magic)) + offset = head.offset + 1 } - val lastRead = log.read(startOffset = numMessages, maxLength = 1024*1024, maxOffset = Some(numMessages + 1)).messageSet - assertEquals("Should be no more messages", 0, lastRead.size) + val lastRead = log.read(startOffset = numMessages, maxLength = 1024*1024, maxOffset = Some(numMessages + 1)).records + assertEquals("Should be no more messages", 0, lastRead.shallowIterator.asScala.size) // check that rolling the log forced a flushed the log--the flush is asyn so retry in case of failure TestUtils.retry(1000L){ @@ -383,10 +384,10 @@ class LogTest extends JUnitSuite { val log = new Log(logDir, LogConfig(logProps), recoveryPoint = 0L, time.scheduler, time = time) /* append 2 compressed message sets, each with two messages giving offsets 0, 1, 2, 3 */ - log.append(new ByteBufferMessageSet(DefaultCompressionCodec, new Message("hello".getBytes), new Message("there".getBytes))) - log.append(new ByteBufferMessageSet(DefaultCompressionCodec, new Message("alpha".getBytes), new Message("beta".getBytes))) + log.append(MemoryRecords.withRecords(CompressionType.GZIP, Record.create("hello".getBytes), Record.create("there".getBytes))) + log.append(MemoryRecords.withRecords(CompressionType.GZIP, Record.create("alpha".getBytes), Record.create("beta".getBytes))) - def read(offset: Int) = ByteBufferMessageSet.deepIterator(log.read(offset, 4096).messageSet.head) + def read(offset: Int) = log.read(offset, 4096).records.deepIterator /* we should always get the first message in the compressed set when reading any offset in the set */ assertEquals("Read at offset 0 should produce 0", 0, read(0).next().offset) @@ -408,7 +409,7 @@ class LogTest extends JUnitSuite { logProps.put(LogConfig.RetentionMsProp, 0: java.lang.Integer) val log = new Log(logDir, LogConfig(logProps), recoveryPoint = 0L, time.scheduler, time = time) for(i <- 0 until messagesToAppend) - log.append(TestUtils.singleMessageSet(payload = i.toString.getBytes, timestamp = time.milliseconds - 10)) + log.append(TestUtils.singletonRecords(value = i.toString.getBytes, timestamp = time.milliseconds - 10)) val currOffset = log.logEndOffset assertEquals(currOffset, messagesToAppend) @@ -422,7 +423,7 @@ class LogTest extends JUnitSuite { assertEquals("Still no change in the logEndOffset", currOffset, log.logEndOffset) assertEquals("Should still be able to append and should get the logEndOffset assigned to the new append", currOffset, - log.append(TestUtils.singleMessageSet("hello".getBytes)).firstOffset) + log.append(TestUtils.singletonRecords("hello".getBytes)).firstOffset) // cleanup the log log.delete() @@ -435,7 +436,7 @@ class LogTest extends JUnitSuite { */ @Test def testMessageSetSizeCheck() { - val messageSet = new ByteBufferMessageSet(NoCompressionCodec, new Message ("You".getBytes), new Message("bethe".getBytes)) + val messageSet = MemoryRecords.withRecords(Record.create("You".getBytes), Record.create("bethe".getBytes)) // append messages to log val configSegmentSize = messageSet.sizeInBytes - 1 val logProps = new Properties() @@ -454,17 +455,17 @@ class LogTest extends JUnitSuite { @Test def testCompactedTopicConstraints() { - val keyedMessage = new Message(bytes = "this message has a key".getBytes, key = "and here it is".getBytes, Message.NoTimestamp, Message.CurrentMagicValue) - val anotherKeyedMessage = new Message(bytes = "this message also has a key".getBytes, key ="another key".getBytes, Message.NoTimestamp, Message.CurrentMagicValue) - val unkeyedMessage = new Message(bytes = "this message does not have a key".getBytes) + val keyedMessage = Record.create(Record.CURRENT_MAGIC_VALUE, Record.NO_TIMESTAMP, "and here it is".getBytes, "this message has a key".getBytes) + val anotherKeyedMessage = Record.create(Record.CURRENT_MAGIC_VALUE, Record.NO_TIMESTAMP, "another key".getBytes, "this message also has a key".getBytes) + val unkeyedMessage = Record.create("this message does not have a key".getBytes) - val messageSetWithUnkeyedMessage = new ByteBufferMessageSet(NoCompressionCodec, unkeyedMessage, keyedMessage) - val messageSetWithOneUnkeyedMessage = new ByteBufferMessageSet(NoCompressionCodec, unkeyedMessage) - val messageSetWithCompressedKeyedMessage = new ByteBufferMessageSet(GZIPCompressionCodec, keyedMessage) - val messageSetWithCompressedUnkeyedMessage = new ByteBufferMessageSet(GZIPCompressionCodec, keyedMessage, unkeyedMessage) + val messageSetWithUnkeyedMessage = MemoryRecords.withRecords(CompressionType.NONE, unkeyedMessage, keyedMessage) + val messageSetWithOneUnkeyedMessage = MemoryRecords.withRecords(CompressionType.NONE, unkeyedMessage) + val messageSetWithCompressedKeyedMessage = MemoryRecords.withRecords(CompressionType.GZIP, keyedMessage) + val messageSetWithCompressedUnkeyedMessage = MemoryRecords.withRecords(CompressionType.GZIP, keyedMessage, unkeyedMessage) - val messageSetWithKeyedMessage = new ByteBufferMessageSet(NoCompressionCodec, keyedMessage) - val messageSetWithKeyedMessages = new ByteBufferMessageSet(NoCompressionCodec, keyedMessage, anotherKeyedMessage) + val messageSetWithKeyedMessage = MemoryRecords.withRecords(CompressionType.NONE, keyedMessage) + val messageSetWithKeyedMessages = MemoryRecords.withRecords(CompressionType.NONE, keyedMessage, anotherKeyedMessage) val logProps = new Properties() logProps.put(LogConfig.CleanupPolicyProp, LogConfig.Compact) @@ -502,8 +503,8 @@ class LogTest extends JUnitSuite { */ @Test def testMessageSizeCheck() { - val first = new ByteBufferMessageSet(NoCompressionCodec, new Message ("You".getBytes), new Message("bethe".getBytes)) - val second = new ByteBufferMessageSet(NoCompressionCodec, new Message("change (I need more bytes)".getBytes)) + val first = MemoryRecords.withRecords(CompressionType.NONE, Record.create("You".getBytes), Record.create("bethe".getBytes)) + val second = MemoryRecords.withRecords(CompressionType.NONE, Record.create("change (I need more bytes)".getBytes)) // append messages to log val maxMessageSize = second.sizeInBytes - 1 @@ -537,7 +538,7 @@ class LogTest extends JUnitSuite { val config = LogConfig(logProps) var log = new Log(logDir, config, recoveryPoint = 0L, time.scheduler, time) for(i <- 0 until numMessages) - log.append(TestUtils.singleMessageSet(payload = TestUtils.randomBytes(messageSize), + log.append(TestUtils.singletonRecords(value = TestUtils.randomBytes(messageSize), timestamp = time.milliseconds + i * 10)) assertEquals("After appending %d messages to an empty log, the log end offset should be %d".format(numMessages, numMessages), numMessages, log.logEndOffset) val lastIndexOffset = log.activeSegment.index.lastOffset @@ -585,7 +586,7 @@ class LogTest extends JUnitSuite { val log = new Log(logDir, config, recoveryPoint = 0L, time.scheduler, time) val messages = (0 until numMessages).map { i => - new ByteBufferMessageSet(NoCompressionCodec, new LongRef(100 + i), new Message(i.toString.getBytes(), time.milliseconds + i, Message.MagicValue_V1)) + MemoryRecords.withLogEntries(LogEntry.create(100 + i, Record.create(Record.MAGIC_VALUE_V1, time.milliseconds + i, i.toString.getBytes()))) } messages.foreach(log.append(_, assignOffsets = false)) val timeIndexEntries = log.logSegments.foldLeft(0) { (entries, segment) => entries + segment.timeIndex.entries } @@ -608,7 +609,7 @@ class LogTest extends JUnitSuite { val config = LogConfig(logProps) var log = new Log(logDir, config, recoveryPoint = 0L, time.scheduler, time) for(i <- 0 until numMessages) - log.append(TestUtils.singleMessageSet(payload = TestUtils.randomBytes(10), timestamp = time.milliseconds + i * 10)) + log.append(TestUtils.singletonRecords(value = TestUtils.randomBytes(10), timestamp = time.milliseconds + i * 10)) val indexFiles = log.logSegments.map(_.index.file) val timeIndexFiles = log.logSegments.map(_.timeIndex.file) log.close() @@ -623,7 +624,7 @@ class LogTest extends JUnitSuite { assertTrue("The index should have been rebuilt", log.logSegments.head.index.entries > 0) assertTrue("The time index should have been rebuilt", log.logSegments.head.timeIndex.entries > 0) for(i <- 0 until numMessages) { - assertEquals(i, log.read(i, 100, None).messageSet.head.offset) + assertEquals(i, log.read(i, 100, None).records.shallowIterator.next().offset) if (i == 0) assertEquals(log.logSegments.head.baseOffset, log.fetchOffsetsByTimestamp(time.milliseconds + i * 10).get.offset) else @@ -647,7 +648,7 @@ class LogTest extends JUnitSuite { val config = LogConfig(logProps) var log = new Log(logDir, config, recoveryPoint = 0L, time.scheduler, time) for(i <- 0 until numMessages) - log.append(TestUtils.singleMessageSet(payload = TestUtils.randomBytes(10), timestamp = time.milliseconds + i * 10)) + log.append(TestUtils.singletonRecords(value = TestUtils.randomBytes(10), timestamp = time.milliseconds + i * 10)) val timeIndexFiles = log.logSegments.map(_.timeIndex.file) log.close() @@ -676,7 +677,7 @@ class LogTest extends JUnitSuite { val config = LogConfig(logProps) var log = new Log(logDir, config, recoveryPoint = 0L, time.scheduler, time) for(i <- 0 until numMessages) - log.append(TestUtils.singleMessageSet(payload = TestUtils.randomBytes(10), timestamp = time.milliseconds + i * 10)) + log.append(TestUtils.singletonRecords(value = TestUtils.randomBytes(10), timestamp = time.milliseconds + i * 10)) val indexFiles = log.logSegments.map(_.index.file) val timeIndexFiles = log.logSegments.map(_.timeIndex.file) log.close() @@ -699,7 +700,7 @@ class LogTest extends JUnitSuite { log = new Log(logDir, config, recoveryPoint = 200L, time.scheduler, time) assertEquals("Should have %d messages when log is reopened".format(numMessages), numMessages, log.logEndOffset) for(i <- 0 until numMessages) { - assertEquals(i, log.read(i, 100, None).messageSet.head.offset) + assertEquals(i, log.read(i, 100, None).records.shallowIterator.next().offset) if (i == 0) assertEquals(log.logSegments.head.baseOffset, log.fetchOffsetsByTimestamp(time.milliseconds + i * 10).get.offset) else @@ -713,7 +714,7 @@ class LogTest extends JUnitSuite { */ @Test def testTruncateTo() { - val set = TestUtils.singleMessageSet("test".getBytes) + val set = TestUtils.singletonRecords("test".getBytes) val setSize = set.sizeInBytes val msgPerSeg = 10 val segmentSize = msgPerSeg * setSize // each segment will be 10 messages @@ -770,7 +771,7 @@ class LogTest extends JUnitSuite { */ @Test def testIndexResizingAtTruncation() { - val setSize = TestUtils.singleMessageSet(payload = "test".getBytes).sizeInBytes + val setSize = TestUtils.singletonRecords(value = "test".getBytes).sizeInBytes val msgPerSeg = 10 val segmentSize = msgPerSeg * setSize // each segment will be 10 messages val logProps = new Properties() @@ -781,12 +782,12 @@ class LogTest extends JUnitSuite { assertEquals("There should be exactly 1 segment.", 1, log.numberOfSegments) for (i<- 1 to msgPerSeg) - log.append(TestUtils.singleMessageSet(payload = "test".getBytes, timestamp = time.milliseconds + i)) + log.append(TestUtils.singletonRecords(value = "test".getBytes, timestamp = time.milliseconds + i)) assertEquals("There should be exactly 1 segment.", 1, log.numberOfSegments) time.sleep(msgPerSeg) for (i<- 1 to msgPerSeg) - log.append(TestUtils.singleMessageSet(payload = "test".getBytes, timestamp = time.milliseconds + i)) + log.append(TestUtils.singletonRecords(value = "test".getBytes, timestamp = time.milliseconds + i)) assertEquals("There should be exactly 2 segment.", 2, log.numberOfSegments) val expectedEntries = msgPerSeg - 1 @@ -800,7 +801,7 @@ class LogTest extends JUnitSuite { time.sleep(msgPerSeg) for (i<- 1 to msgPerSeg) - log.append(TestUtils.singleMessageSet(payload = "test".getBytes, timestamp = time.milliseconds + i)) + log.append(TestUtils.singletonRecords(value = "test".getBytes, timestamp = time.milliseconds + i)) assertEquals("There should be exactly 1 segment.", 1, log.numberOfSegments) } @@ -814,7 +815,7 @@ class LogTest extends JUnitSuite { val bogusIndex2 = Log.indexFilename(logDir, 5) val bogusTimeIndex2 = Log.timeIndexFilename(logDir, 5) - val set = TestUtils.singleMessageSet("test".getBytes) + val set = TestUtils.singletonRecords("test".getBytes) val logProps = new Properties() logProps.put(LogConfig.SegmentBytesProp, set.sizeInBytes * 5: java.lang.Integer) logProps.put(LogConfig.SegmentIndexBytesProp, 1000: java.lang.Integer) @@ -842,7 +843,7 @@ class LogTest extends JUnitSuite { */ @Test def testReopenThenTruncate() { - val set = TestUtils.singleMessageSet("test".getBytes) + val set = TestUtils.singletonRecords("test".getBytes) val logProps = new Properties() logProps.put(LogConfig.SegmentBytesProp, set.sizeInBytes * 5: java.lang.Integer) logProps.put(LogConfig.SegmentIndexBytesProp, 1000: java.lang.Integer) @@ -875,7 +876,7 @@ class LogTest extends JUnitSuite { */ @Test def testAsyncDelete() { - val set = TestUtils.singleMessageSet("test".getBytes) + val set = TestUtils.singletonRecords("test".getBytes) val asyncDeleteMs = 1000 val logProps = new Properties() logProps.put(LogConfig.SegmentBytesProp, set.sizeInBytes * 5: java.lang.Integer) @@ -921,7 +922,7 @@ class LogTest extends JUnitSuite { */ @Test def testOpenDeletesObsoleteFiles() { - val set = TestUtils.singleMessageSet("test".getBytes) + val set = TestUtils.singletonRecords("test".getBytes) val logProps = new Properties() logProps.put(LogConfig.SegmentBytesProp, set.sizeInBytes * 5: java.lang.Integer) logProps.put(LogConfig.SegmentIndexBytesProp, 1000: java.lang.Integer) @@ -957,10 +958,10 @@ class LogTest extends JUnitSuite { recoveryPoint = 0L, time.scheduler, time) - log.append(new ByteBufferMessageSet(new Message(bytes = null))) - val messageSet = log.read(0, 4096, None).messageSet - assertEquals(0, messageSet.head.offset) - assertTrue("Message payload should be null.", messageSet.head.message.isNull) + log.append(MemoryRecords.withRecords(Record.create(null))) + val head = log.read(0, 4096, None).records.shallowIterator().next() + assertEquals(0, head.offset) + assertTrue("Message payload should be null.", head.record.hasNullValue) } @Test(expected = classOf[IllegalArgumentException]) @@ -970,9 +971,9 @@ class LogTest extends JUnitSuite { recoveryPoint = 0L, time.scheduler, time) - val messages = (0 until 2).map(id => new Message(id.toString.getBytes)).toArray - messages.foreach(message => log.append(new ByteBufferMessageSet(message))) - val invalidMessage = new ByteBufferMessageSet(new Message(1.toString.getBytes)) + val messages = (0 until 2).map(id => Record.create(id.toString.getBytes)).toArray + messages.foreach(record => log.append(MemoryRecords.withRecords(record))) + val invalidMessage = MemoryRecords.withRecords(Record.create(1.toString.getBytes)) log.append(invalidMessage, assignOffsets = false) } @@ -984,7 +985,7 @@ class LogTest extends JUnitSuite { logProps.put(LogConfig.IndexIntervalBytesProp, 1: java.lang.Integer) logProps.put(LogConfig.MaxMessageBytesProp, 64*1024: java.lang.Integer) val config = LogConfig(logProps) - val set = TestUtils.singleMessageSet("test".getBytes) + val set = TestUtils.singletonRecords("test".getBytes) val recoveryPoint = 50L for (_ <- 0 until 50) { // create a log and write some messages to it @@ -997,7 +998,7 @@ class LogTest extends JUnitSuite { val numMessages = 50 + TestUtils.random.nextInt(50) for (_ <- 0 until numMessages) log.append(set) - val messages = log.logSegments.flatMap(_.log.iterator.toList) + val messages = log.logSegments.flatMap(_.log.deepIterator.asScala.toList) log.close() // corrupt index and log by appending random bytes @@ -1007,7 +1008,8 @@ class LogTest extends JUnitSuite { // attempt recovery log = new Log(logDir, config, recoveryPoint, time.scheduler, time) assertEquals(numMessages, log.logEndOffset) - assertEquals("Messages in the log after recovery should be the same.", messages, log.logSegments.flatMap(_.log.iterator.toList)) + assertEquals("Messages in the log after recovery should be the same.", messages, + log.logSegments.flatMap(_.log.deepIterator.asScala.toList)) Utils.delete(logDir) } } @@ -1020,7 +1022,7 @@ class LogTest extends JUnitSuite { logProps.put(LogConfig.MaxMessageBytesProp, 64*1024: java.lang.Integer) logProps.put(LogConfig.IndexIntervalBytesProp, 1: java.lang.Integer) val config = LogConfig(logProps) - val set = TestUtils.singleMessageSet("test".getBytes) + val set = TestUtils.singletonRecords("test".getBytes) val parentLogDir = logDir.getParentFile assertTrue("Data directory %s must exist", parentLogDir.isDirectory) val cleanShutdownFile = new File(parentLogDir, Log.CleanShutdownFile) @@ -1121,7 +1123,7 @@ class LogTest extends JUnitSuite { @Test def testDeleteOldSegmentsMethod() { - val set = TestUtils.singleMessageSet("test".getBytes) + val set = TestUtils.singletonRecords("test".getBytes) val logProps = new Properties() logProps.put(LogConfig.SegmentBytesProp, set.sizeInBytes * 5: java.lang.Integer) logProps.put(LogConfig.SegmentIndexBytesProp, 1000: java.lang.Integer) @@ -1154,7 +1156,7 @@ class LogTest extends JUnitSuite { @Test def shouldDeleteSizeBasedSegments() { - val set = TestUtils.singleMessageSet("test".getBytes) + val set = TestUtils.singletonRecords("test".getBytes) val log = createLog(set.sizeInBytes, retentionBytes = set.sizeInBytes * 10) // append some messages to create some segments @@ -1167,7 +1169,7 @@ class LogTest extends JUnitSuite { @Test def shouldNotDeleteSizeBasedSegmentsWhenUnderRetentionSize() { - val set = TestUtils.singleMessageSet("test".getBytes) + val set = TestUtils.singletonRecords("test".getBytes) val log = createLog(set.sizeInBytes, retentionBytes = set.sizeInBytes * 15) // append some messages to create some segments @@ -1180,7 +1182,7 @@ class LogTest extends JUnitSuite { @Test def shouldDeleteTimeBasedSegmentsReadyToBeDeleted() { - val set = TestUtils.singleMessageSet("test".getBytes, timestamp = 10) + val set = TestUtils.singletonRecords("test".getBytes, timestamp = 10) val log = createLog(set.sizeInBytes, retentionMs = 10000) // append some messages to create some segments @@ -1193,7 +1195,7 @@ class LogTest extends JUnitSuite { @Test def shouldNotDeleteTimeBasedSegmentsWhenNoneReadyToBeDeleted() { - val set = TestUtils.singleMessageSet("test".getBytes, timestamp = time.milliseconds) + val set = TestUtils.singletonRecords("test".getBytes, timestamp = time.milliseconds) val log = createLog(set.sizeInBytes, retentionMs = 10000000) // append some messages to create some segments @@ -1206,7 +1208,7 @@ class LogTest extends JUnitSuite { @Test def shouldNotDeleteSegmentsWhenPolicyDoesNotIncludeDelete() { - val set = TestUtils.singleMessageSet("test".getBytes, key = "test".getBytes(), timestamp = 10L) + val set = TestUtils.singletonRecords("test".getBytes, key = "test".getBytes(), timestamp = 10L) val log = createLog(set.sizeInBytes, retentionMs = 10000, cleanupPolicy = "compact") @@ -1225,7 +1227,7 @@ class LogTest extends JUnitSuite { @Test def shouldDeleteSegmentsReadyToBeDeletedWhenCleanupPolicyIsCompactAndDelete() { - val set = TestUtils.singleMessageSet("test".getBytes, key = "test".getBytes,timestamp = 10L) + val set = TestUtils.singletonRecords("test".getBytes, key = "test".getBytes,timestamp = 10L) val log = createLog(set.sizeInBytes, retentionMs = 10000, cleanupPolicy = "compact,delete")
http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1e5b9/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala b/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala new file mode 100644 index 0000000..72c5b16 --- /dev/null +++ b/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala @@ -0,0 +1,395 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.log + +import java.nio.ByteBuffer + +import kafka.common.LongRef +import kafka.message._ +import org.apache.kafka.common.errors.InvalidTimestampException +import org.apache.kafka.common.record._ +import org.junit.Assert._ +import org.junit.Test +import org.scalatest.junit.JUnitSuite + +import scala.collection.JavaConverters._ + +class LogValidatorTest extends JUnitSuite { + + @Test + def testLogAppendTimeNonCompressed() { + val now = System.currentTimeMillis() + // The timestamps should be overwritten + val records = createRecords(magicValue = Record.MAGIC_VALUE_V1, timestamp = 0L, codec = CompressionType.NONE) + val validatedResults = LogValidator.validateMessagesAndAssignOffsets(records, + offsetCounter = new LongRef(0), + now = now, + sourceCodec = NoCompressionCodec, + targetCodec = NoCompressionCodec, + messageFormatVersion = Record.MAGIC_VALUE_V1, + messageTimestampType = TimestampType.LOG_APPEND_TIME, + messageTimestampDiffMaxMs = 1000L) + val validatedRecords = validatedResults.validatedRecords + assertEquals("number of messages should not change", records.deepIterator.asScala.size, validatedRecords.deepIterator.asScala.size) + validatedRecords.deepIterator.asScala.foreach(logEntry => validateLogAppendTime(now, logEntry.record)) + assertEquals(s"Max timestamp should be $now", now, validatedResults.maxTimestamp) + assertEquals(s"The offset of max timestamp should be 0", 0, validatedResults.shallowOffsetOfMaxTimestamp) + assertFalse("Message size should not have been changed", validatedResults.messageSizeMaybeChanged) + } + + @Test + def testLogAppendTimeWithRecompression() { + val now = System.currentTimeMillis() + // The timestamps should be overwritten + val records = createRecords(magicValue = Record.MAGIC_VALUE_V0, codec = CompressionType.GZIP) + val validatedResults = LogValidator.validateMessagesAndAssignOffsets( + records, + offsetCounter = new LongRef(0), + now = now, + sourceCodec = DefaultCompressionCodec, + targetCodec = DefaultCompressionCodec, + messageFormatVersion = Record.MAGIC_VALUE_V1, + messageTimestampType = TimestampType.LOG_APPEND_TIME, + messageTimestampDiffMaxMs = 1000L) + val validatedRecords = validatedResults.validatedRecords + + assertEquals("number of messages should not change", records.deepIterator.asScala.size, validatedRecords.deepIterator.asScala.size) + validatedRecords.deepIterator.asScala.foreach(logEntry => validateLogAppendTime(now, logEntry.record)) + assertTrue("MessageSet should still valid", validatedRecords.shallowIterator.next().record.isValid) + assertEquals(s"Max timestamp should be $now", now, validatedResults.maxTimestamp) + assertEquals(s"The offset of max timestamp should be ${records.deepIterator.asScala.size - 1}", + records.deepIterator.asScala.size - 1, validatedResults.shallowOffsetOfMaxTimestamp) + assertTrue("Message size may have been changed", validatedResults.messageSizeMaybeChanged) + } + + @Test + def testLogAppendTimeWithoutRecompression() { + val now = System.currentTimeMillis() + // The timestamps should be overwritten + val records = createRecords(magicValue = Record.MAGIC_VALUE_V1, + timestamp = 0L, codec = CompressionType.GZIP) + val validatedResults = LogValidator.validateMessagesAndAssignOffsets( + records, + offsetCounter = new LongRef(0), + now = now, + sourceCodec = DefaultCompressionCodec, + targetCodec = DefaultCompressionCodec, + messageFormatVersion = Record.MAGIC_VALUE_V1, + messageTimestampType = TimestampType.LOG_APPEND_TIME, + messageTimestampDiffMaxMs = 1000L) + val validatedRecords = validatedResults.validatedRecords + + assertEquals("number of messages should not change", records.deepIterator.asScala.size, + validatedRecords.deepIterator.asScala.size) + validatedRecords.deepIterator.asScala.foreach(logEntry => validateLogAppendTime(now, logEntry.record)) + assertTrue("MessageSet should still valid", validatedRecords.shallowIterator.next().record.isValid) + assertEquals(s"Max timestamp should be $now", now, validatedResults.maxTimestamp) + assertEquals(s"The offset of max timestamp should be ${records.deepIterator.asScala.size - 1}", + records.deepIterator.asScala.size - 1, validatedResults.shallowOffsetOfMaxTimestamp) + assertFalse("Message size should not have been changed", validatedResults.messageSizeMaybeChanged) + } + + @Test + def testCreateTimeNonCompressed() { + val now = System.currentTimeMillis() + val timestampSeq = Seq(now - 1, now + 1, now) + val records = + MemoryRecords.withRecords(CompressionType.NONE, + Record.create(Record.MAGIC_VALUE_V1, timestampSeq(0), "hello".getBytes), + Record.create(Record.MAGIC_VALUE_V1, timestampSeq(1), "there".getBytes), + Record.create(Record.MAGIC_VALUE_V1, timestampSeq(2), "beautiful".getBytes)) + + val validatingResults = LogValidator.validateMessagesAndAssignOffsets(records, + offsetCounter = new LongRef(0), + now = System.currentTimeMillis(), + sourceCodec = NoCompressionCodec, + targetCodec = NoCompressionCodec, + messageFormatVersion = Record.MAGIC_VALUE_V1, + messageTimestampType = TimestampType.CREATE_TIME, + messageTimestampDiffMaxMs = 1000L) + val validatedRecords = validatingResults.validatedRecords + + var i = 0 + for (logEntry <- validatedRecords.deepIterator.asScala) { + logEntry.record.ensureValid() + assertEquals(logEntry.record.timestamp, timestampSeq(i)) + assertEquals(logEntry.record.timestampType, TimestampType.CREATE_TIME) + i += 1 + } + assertEquals(s"Max timestamp should be ${now + 1}", now + 1, validatingResults.maxTimestamp) + assertEquals(s"Offset of max timestamp should be 1", 1, validatingResults.shallowOffsetOfMaxTimestamp) + assertFalse("Message size should not have been changed", validatingResults.messageSizeMaybeChanged) + } + + @Test + def testCreateTimeCompressed() { + val now = System.currentTimeMillis() + val timestampSeq = Seq(now - 1, now + 1, now) + val records = + MemoryRecords.withRecords(CompressionType.GZIP, + Record.create(Record.MAGIC_VALUE_V1, timestampSeq(0), "hello".getBytes), + Record.create(Record.MAGIC_VALUE_V1, timestampSeq(1), "there".getBytes), + Record.create(Record.MAGIC_VALUE_V1, timestampSeq(2), "beautiful".getBytes)) + + val validatedResults = + LogValidator.validateMessagesAndAssignOffsets(records, + offsetCounter = new LongRef(0), + now = System.currentTimeMillis(), + sourceCodec = DefaultCompressionCodec, + targetCodec = DefaultCompressionCodec, + messageFormatVersion = Record.MAGIC_VALUE_V1, + messageTimestampType = TimestampType.CREATE_TIME, + messageTimestampDiffMaxMs = 1000L) + val validatedRecords = validatedResults.validatedRecords + + var i = 0 + for (logEntry <- validatedRecords.deepIterator.asScala) { + logEntry.record.ensureValid() + assertEquals(logEntry.record.timestamp, timestampSeq(i)) + assertEquals(logEntry.record.timestampType, TimestampType.CREATE_TIME) + i += 1 + } + assertEquals(s"Max timestamp should be ${now + 1}", now + 1, validatedResults.maxTimestamp) + assertEquals(s"Offset of max timestamp should be ${validatedRecords.deepIterator.asScala.size - 1}", + validatedRecords.deepIterator.asScala.size - 1, validatedResults.shallowOffsetOfMaxTimestamp) + assertFalse("Message size should not have been changed", validatedResults.messageSizeMaybeChanged) + } + + @Test(expected = classOf[InvalidTimestampException]) + def testInvalidCreateTimeNonCompressed() { + val now = System.currentTimeMillis() + val records = createRecords(magicValue = Record.MAGIC_VALUE_V1, timestamp = now - 1001L, + codec = CompressionType.NONE) + LogValidator.validateMessagesAndAssignOffsets( + records, + offsetCounter = new LongRef(0), + now = System.currentTimeMillis(), + sourceCodec = NoCompressionCodec, + targetCodec = NoCompressionCodec, + messageFormatVersion = Record.MAGIC_VALUE_V1, + messageTimestampType = TimestampType.CREATE_TIME, + messageTimestampDiffMaxMs = 1000L) + } + + @Test(expected = classOf[InvalidTimestampException]) + def testInvalidCreateTimeCompressed() { + val now = System.currentTimeMillis() + val records = createRecords(magicValue = Record.MAGIC_VALUE_V1, timestamp = now - 1001L, + codec = CompressionType.GZIP) + LogValidator.validateMessagesAndAssignOffsets( + records, + offsetCounter = new LongRef(0), + now = System.currentTimeMillis(), + sourceCodec = DefaultCompressionCodec, + targetCodec = DefaultCompressionCodec, + messageFormatVersion = Record.MAGIC_VALUE_V1, + messageTimestampType = TimestampType.CREATE_TIME, + messageTimestampDiffMaxMs = 1000L) + } + @Test + def testAbsoluteOffsetAssignmentNonCompressed() { + val records = createRecords(magicValue = Record.MAGIC_VALUE_V0, codec = CompressionType.NONE) + val offset = 1234567 + checkOffsets(records, 0) + checkOffsets(LogValidator.validateMessagesAndAssignOffsets(records, + offsetCounter = new LongRef(offset), + now = System.currentTimeMillis(), + sourceCodec = NoCompressionCodec, + targetCodec = NoCompressionCodec, + messageFormatVersion = Record.MAGIC_VALUE_V0, + messageTimestampType = TimestampType.CREATE_TIME, + messageTimestampDiffMaxMs = 1000L).validatedRecords, offset) + } + + @Test + def testAbsoluteOffsetAssignmentCompressed() { + val records = createRecords(magicValue = Record.MAGIC_VALUE_V0, codec = CompressionType.GZIP) + val offset = 1234567 + checkOffsets(records, 0) + checkOffsets(LogValidator.validateMessagesAndAssignOffsets(records, + offsetCounter = new LongRef(offset), + now = System.currentTimeMillis(), + sourceCodec = DefaultCompressionCodec, + targetCodec = DefaultCompressionCodec, + messageFormatVersion = Record.MAGIC_VALUE_V0, + messageTimestampType = TimestampType.CREATE_TIME, + messageTimestampDiffMaxMs = 1000L).validatedRecords, offset) + } + + @Test + def testRelativeOffsetAssignmentNonCompressed() { + val now = System.currentTimeMillis() + val records = createRecords(magicValue = Record.MAGIC_VALUE_V1, timestamp = now, codec = CompressionType.NONE) + val offset = 1234567 + checkOffsets(records, 0) + val messageWithOffset = LogValidator.validateMessagesAndAssignOffsets(records, + offsetCounter = new LongRef(offset), + now = System.currentTimeMillis(), + sourceCodec = NoCompressionCodec, + targetCodec = NoCompressionCodec, + messageTimestampType = TimestampType.CREATE_TIME, + messageTimestampDiffMaxMs = 5000L).validatedRecords + checkOffsets(messageWithOffset, offset) + } + + @Test + def testRelativeOffsetAssignmentCompressed() { + val now = System.currentTimeMillis() + val records = createRecords(magicValue = Record.MAGIC_VALUE_V1, timestamp = now, codec = CompressionType.GZIP) + val offset = 1234567 + checkOffsets(records, 0) + val compressedMessagesWithOffset = LogValidator.validateMessagesAndAssignOffsets( + records, + offsetCounter = new LongRef(offset), + now = System.currentTimeMillis(), + sourceCodec = DefaultCompressionCodec, + targetCodec = DefaultCompressionCodec, + messageTimestampType = TimestampType.CREATE_TIME, + messageTimestampDiffMaxMs = 5000L).validatedRecords + checkOffsets(compressedMessagesWithOffset, offset) + } + + @Test + def testOffsetAssignmentAfterMessageFormatConversionV0NonCompressed() { + val records = createRecords(magicValue = Record.MAGIC_VALUE_V0, codec = CompressionType.NONE) + checkOffsets(records, 0) + val offset = 1234567 + checkOffsets(LogValidator.validateMessagesAndAssignOffsets(records, + offsetCounter = new LongRef(offset), + now = System.currentTimeMillis(), + sourceCodec = NoCompressionCodec, + targetCodec = NoCompressionCodec, + messageFormatVersion = Record.MAGIC_VALUE_V1, + messageTimestampType = TimestampType.LOG_APPEND_TIME, + messageTimestampDiffMaxMs = 1000L).validatedRecords, offset) + } + + @Test + def testOffsetAssignmentAfterMessageFormatConversionV0Compressed() { + val records = createRecords(magicValue = Record.MAGIC_VALUE_V0, codec = CompressionType.GZIP) + val offset = 1234567 + checkOffsets(records, 0) + checkOffsets(LogValidator.validateMessagesAndAssignOffsets(records, + offsetCounter = new LongRef(offset), + now = System.currentTimeMillis(), + sourceCodec = DefaultCompressionCodec, + targetCodec = DefaultCompressionCodec, + messageFormatVersion = Record.MAGIC_VALUE_V1, + messageTimestampType = TimestampType.LOG_APPEND_TIME, + messageTimestampDiffMaxMs = 1000L).validatedRecords, offset) + } + + @Test + def testOffsetAssignmentAfterMessageFormatConversionV1NonCompressed() { + val offset = 1234567 + val now = System.currentTimeMillis() + val records = createRecords(Record.MAGIC_VALUE_V1, now, codec = CompressionType.NONE) + checkOffsets(records, 0) + checkOffsets(LogValidator.validateMessagesAndAssignOffsets(records, + offsetCounter = new LongRef(offset), + now = System.currentTimeMillis(), + sourceCodec = NoCompressionCodec, + targetCodec = NoCompressionCodec, + messageFormatVersion = Record.MAGIC_VALUE_V0, + messageTimestampType = TimestampType.CREATE_TIME, + messageTimestampDiffMaxMs = 5000L).validatedRecords, offset) + } + + @Test + def testOffsetAssignmentAfterMessageFormatConversionV1Compressed() { + val offset = 1234567 + val now = System.currentTimeMillis() + val records = createRecords(Record.MAGIC_VALUE_V1, now, CompressionType.GZIP) + checkOffsets(records, 0) + checkOffsets(LogValidator.validateMessagesAndAssignOffsets(records, + offsetCounter = new LongRef(offset), + now = System.currentTimeMillis(), + sourceCodec = DefaultCompressionCodec, + targetCodec = DefaultCompressionCodec, + messageFormatVersion = Record.MAGIC_VALUE_V0, + messageTimestampType = TimestampType.CREATE_TIME, + messageTimestampDiffMaxMs = 5000L).validatedRecords, offset) + } + + @Test(expected = classOf[InvalidRecordException]) + def testInvalidInnerMagicVersion(): Unit = { + val offset = 1234567 + val records = recordsWithInvalidInnerMagic(offset) + LogValidator.validateMessagesAndAssignOffsets(records, + offsetCounter = new LongRef(offset), + now = System.currentTimeMillis(), + sourceCodec = SnappyCompressionCodec, + targetCodec = SnappyCompressionCodec, + messageTimestampType = TimestampType.CREATE_TIME, + messageTimestampDiffMaxMs = 5000L) + } + + private def createRecords(magicValue: Byte = Message.CurrentMagicValue, + timestamp: Long = Message.NoTimestamp, + codec: CompressionType = CompressionType.NONE): MemoryRecords = { + if (magicValue == Record.MAGIC_VALUE_V0) { + MemoryRecords.withRecords( + codec, + Record.create(Record.MAGIC_VALUE_V0, Record.NO_TIMESTAMP, "hello".getBytes), + Record.create(Record.MAGIC_VALUE_V0, Record.NO_TIMESTAMP, "there".getBytes), + Record.create(Record.MAGIC_VALUE_V0, Record.NO_TIMESTAMP, "beautiful".getBytes)) + } else { + MemoryRecords.withRecords( + codec, + Record.create(Record.MAGIC_VALUE_V1, timestamp, "hello".getBytes), + Record.create(Record.MAGIC_VALUE_V1, timestamp, "there".getBytes), + Record.create(Record.MAGIC_VALUE_V1, timestamp, "beautiful".getBytes)) + } + } + + /* check that offsets are assigned consecutively from the given base offset */ + private def checkOffsets(records: MemoryRecords, baseOffset: Long) { + assertTrue("Message set should not be empty", records.deepIterator.asScala.nonEmpty) + var offset = baseOffset + for (entry <- records.deepIterator.asScala) { + assertEquals("Unexpected offset in message set iterator", offset, entry.offset) + offset += 1 + } + } + + private def recordsWithInvalidInnerMagic(initialOffset: Long): MemoryRecords = { + val records = (0 until 20).map(id => + Record.create(Record.MAGIC_VALUE_V0, + Record.NO_TIMESTAMP, + id.toString.getBytes, + id.toString.getBytes)) + + val buffer = ByteBuffer.allocate(math.min(math.max(records.map(_.sizeInBytes()).sum / 2, 1024), 1 << 16)) + val builder = MemoryRecords.builder(buffer, Record.MAGIC_VALUE_V1, CompressionType.GZIP, + TimestampType.CREATE_TIME) + + var offset = initialOffset + records.foreach { record => + builder.appendUnchecked(offset, record) + offset += 1 + } + + builder.build() + } + + def validateLogAppendTime(now: Long, record: Record) { + record.ensureValid() + assertEquals(s"Timestamp of message $record should be $now", now, record.timestamp) + assertEquals(TimestampType.LOG_APPEND_TIME, record.timestampType) + } + +} http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1e5b9/core/src/test/scala/unit/kafka/message/BaseMessageSetTestCases.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/message/BaseMessageSetTestCases.scala b/core/src/test/scala/unit/kafka/message/BaseMessageSetTestCases.scala index 476a577..bd3ed68 100644 --- a/core/src/test/scala/unit/kafka/message/BaseMessageSetTestCases.scala +++ b/core/src/test/scala/unit/kafka/message/BaseMessageSetTestCases.scala @@ -18,13 +18,11 @@ package kafka.message import java.nio.ByteBuffer -import java.nio.channels.{FileChannel, GatheringByteChannel} -import java.nio.file.StandardOpenOption +import java.nio.channels.GatheringByteChannel import org.junit.Assert._ import kafka.utils.TestUtils._ -import kafka.log.FileMessageSet -import kafka.utils.TestUtils +import org.apache.kafka.common.record.FileRecords import org.scalatest.junit.JUnitSuite import org.junit.Test @@ -94,7 +92,7 @@ trait BaseMessageSetTestCases extends JUnitSuite { @Test def testWriteToChannelThatConsumesPartially() { val bytesToConsumePerBuffer = 50 - val messages = (0 until 10).map(_ => new Message(TestUtils.randomString(100).getBytes)) + val messages = (0 until 10).map(_ => new Message(randomString(100).getBytes)) val messageSet = createMessageSet(messages) val messageSetSize = messageSet.sizeInBytes @@ -119,15 +117,15 @@ trait BaseMessageSetTestCases extends JUnitSuite { // do the write twice to ensure the message set is restored to its original state for (_ <- 0 to 1) { val file = tempFile() - val channel = FileChannel.open(file.toPath, StandardOpenOption.READ, StandardOpenOption.WRITE) + val fileRecords = FileRecords.open(file, true) try { - val written = write(channel) + val written = write(fileRecords.channel) + fileRecords.resize() // resize since we wrote to the channel directly + assertEquals("Expect to write the number of bytes in the set.", set.sizeInBytes, written) - val newSet = new FileMessageSet(file, channel) - checkEquals(set.iterator, newSet.iterator) - } finally channel.close() + checkEquals(set.asRecords.deepIterator, fileRecords.deepIterator()) + } finally fileRecords.close() } } } - http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1e5b9/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala b/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala index e2cfb87..5e22433 100644 --- a/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala +++ b/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala @@ -17,13 +17,9 @@ package kafka.message -import java.io.DataOutputStream import java.nio._ -import kafka.common.LongRef import kafka.utils.TestUtils -import org.apache.kafka.common.errors.InvalidTimestampException -import org.apache.kafka.common.record.TimestampType import org.junit.Assert._ import org.junit.Test @@ -151,295 +147,6 @@ class ByteBufferMessageSetTest extends BaseMessageSetTestCases { assertEquals("second offset should be 2", 2L, iter.next().offset) } - @Test - def testLogAppendTime() { - val now = System.currentTimeMillis() - // The timestamps should be overwritten - val messages = getMessages(magicValue = Message.MagicValue_V1, timestamp = 0L, codec = NoCompressionCodec) - val compressedMessagesWithRecompresion = getMessages(magicValue = Message.MagicValue_V0, codec = DefaultCompressionCodec) - val compressedMessagesWithoutRecompression = - getMessages(magicValue = Message.MagicValue_V1, timestamp = 0L, codec = DefaultCompressionCodec) - - val validatingResults = messages.validateMessagesAndAssignOffsets(offsetCounter = new LongRef(0), - now = now, - sourceCodec = NoCompressionCodec, - targetCodec = NoCompressionCodec, - messageFormatVersion = 1, - messageTimestampType = TimestampType.LOG_APPEND_TIME, - messageTimestampDiffMaxMs = 1000L) - val validatedMessages = validatingResults.validatedMessages - - val validatingCompressedMessagesResults = - compressedMessagesWithRecompresion.validateMessagesAndAssignOffsets(offsetCounter = new LongRef(0), - now = now, - sourceCodec = DefaultCompressionCodec, - targetCodec = DefaultCompressionCodec, - messageFormatVersion = 1, - messageTimestampType = TimestampType.LOG_APPEND_TIME, - messageTimestampDiffMaxMs = 1000L) - val validatedCompressedMessages = validatingCompressedMessagesResults.validatedMessages - - val validatingCompressedMessagesWithoutRecompressionResults = - compressedMessagesWithoutRecompression.validateMessagesAndAssignOffsets(offsetCounter = new LongRef(0), - now = now, - sourceCodec = DefaultCompressionCodec, - targetCodec = DefaultCompressionCodec, - messageFormatVersion = 1, - messageTimestampType = TimestampType.LOG_APPEND_TIME, - messageTimestampDiffMaxMs = 1000L) - - val validatedCompressedMessagesWithoutRecompression = validatingCompressedMessagesWithoutRecompressionResults.validatedMessages - - assertEquals("message set size should not change", messages.size, validatedMessages.size) - validatedMessages.foreach(messageAndOffset => validateLogAppendTime(messageAndOffset.message)) - assertEquals(s"Max timestamp should be $now", now, validatingResults.maxTimestamp) - assertEquals(s"The offset of max timestamp should be 0", 0, validatingResults.offsetOfMaxTimestamp) - assertFalse("Message size should not have been changed", validatingResults.messageSizeMaybeChanged) - - assertEquals("message set size should not change", compressedMessagesWithRecompresion.size, validatedCompressedMessages.size) - validatedCompressedMessages.foreach(messageAndOffset => validateLogAppendTime(messageAndOffset.message)) - assertTrue("MessageSet should still valid", validatedCompressedMessages.shallowIterator.next().message.isValid) - assertEquals(s"Max timestamp should be $now", now, validatingCompressedMessagesResults.maxTimestamp) - assertEquals(s"The offset of max timestamp should be ${compressedMessagesWithRecompresion.size - 1}", - compressedMessagesWithRecompresion.size - 1, validatingCompressedMessagesResults.offsetOfMaxTimestamp) - assertTrue("Message size may have been changed", validatingCompressedMessagesResults.messageSizeMaybeChanged) - - assertEquals("message set size should not change", compressedMessagesWithoutRecompression.size, - validatedCompressedMessagesWithoutRecompression.size) - validatedCompressedMessagesWithoutRecompression.foreach(messageAndOffset => validateLogAppendTime(messageAndOffset.message)) - assertTrue("MessageSet should still valid", validatedCompressedMessagesWithoutRecompression.shallowIterator.next().message.isValid) - assertEquals(s"Max timestamp should be $now", now, validatingCompressedMessagesWithoutRecompressionResults.maxTimestamp) - assertEquals(s"The offset of max timestamp should be ${compressedMessagesWithoutRecompression.size - 1}", - compressedMessagesWithoutRecompression.size - 1, validatingCompressedMessagesWithoutRecompressionResults.offsetOfMaxTimestamp) - assertFalse("Message size should not have been changed", validatingCompressedMessagesWithoutRecompressionResults.messageSizeMaybeChanged) - - def validateLogAppendTime(message: Message) { - message.ensureValid() - assertEquals(s"Timestamp of message $message should be $now", now, message.timestamp) - assertEquals(TimestampType.LOG_APPEND_TIME, message.timestampType) - } - } - - @Test - def testCreateTime() { - val now = System.currentTimeMillis() - val timestampSeq = Seq(now - 1, now + 1, now) - val messages = - new ByteBufferMessageSet(NoCompressionCodec, - new Message("hello".getBytes, timestamp = timestampSeq(0), magicValue = Message.MagicValue_V1), - new Message("there".getBytes, timestamp = timestampSeq(1), magicValue = Message.MagicValue_V1), - new Message("beautiful".getBytes, timestamp = timestampSeq(2), magicValue = Message.MagicValue_V1)) - val compressedMessages = - new ByteBufferMessageSet(DefaultCompressionCodec, - new Message("hello".getBytes, timestamp = timestampSeq(0), magicValue = Message.MagicValue_V1), - new Message("there".getBytes, timestamp = timestampSeq(1), magicValue = Message.MagicValue_V1), - new Message("beautiful".getBytes, timestamp = timestampSeq(2), magicValue = Message.MagicValue_V1)) - - val validatingResults = messages.validateMessagesAndAssignOffsets(offsetCounter = new LongRef(0), - now = System.currentTimeMillis(), - sourceCodec = NoCompressionCodec, - targetCodec = NoCompressionCodec, - messageFormatVersion = 1, - messageTimestampType = TimestampType.CREATE_TIME, - messageTimestampDiffMaxMs = 1000L) - val validatedMessages = validatingResults.validatedMessages - - val validatingCompressedMessagesResults = - compressedMessages.validateMessagesAndAssignOffsets(offsetCounter = new LongRef(0), - now = System.currentTimeMillis(), - sourceCodec = DefaultCompressionCodec, - targetCodec = DefaultCompressionCodec, - messageFormatVersion = 1, - messageTimestampType = TimestampType.CREATE_TIME, - messageTimestampDiffMaxMs = 1000L) - val validatedCompressedMessages = validatingCompressedMessagesResults.validatedMessages - - var i = 0 - for (messageAndOffset <- validatedMessages) { - messageAndOffset.message.ensureValid() - assertEquals(messageAndOffset.message.timestamp, timestampSeq(i)) - assertEquals(messageAndOffset.message.timestampType, TimestampType.CREATE_TIME) - i += 1 - } - assertEquals(s"Max timestamp should be ${now + 1}", now + 1, validatingResults.maxTimestamp) - assertEquals(s"Offset of max timestamp should be 1", 1, validatingResults.offsetOfMaxTimestamp) - assertFalse("Message size should not have been changed", validatingResults.messageSizeMaybeChanged) - i = 0 - for (messageAndOffset <- validatedCompressedMessages) { - messageAndOffset.message.ensureValid() - assertEquals(messageAndOffset.message.timestamp, timestampSeq(i)) - assertEquals(messageAndOffset.message.timestampType, TimestampType.CREATE_TIME) - i += 1 - } - assertEquals(s"Max timestamp should be ${now + 1}", now + 1, validatingResults.maxTimestamp) - assertEquals(s"Offset of max timestamp should be ${validatedCompressedMessages.size - 1}", - validatedCompressedMessages.size - 1, validatingCompressedMessagesResults.offsetOfMaxTimestamp) - assertFalse("Message size should not have been changed", validatingCompressedMessagesResults.messageSizeMaybeChanged) - } - - @Test - def testInvalidCreateTime() { - val now = System.currentTimeMillis() - val messages = getMessages(magicValue = Message.MagicValue_V1, timestamp = now - 1001L, codec = NoCompressionCodec) - val compressedMessages = getMessages(magicValue = Message.MagicValue_V1, timestamp = now - 1001L, codec = DefaultCompressionCodec) - - try { - messages.validateMessagesAndAssignOffsets(offsetCounter = new LongRef(0), - now = System.currentTimeMillis(), - sourceCodec = NoCompressionCodec, - targetCodec = NoCompressionCodec, - messageFormatVersion = 1, - messageTimestampType = TimestampType.CREATE_TIME, - messageTimestampDiffMaxMs = 1000L) - fail("Should throw InvalidMessageException.") - } catch { - case _: InvalidTimestampException => - } - - try { - compressedMessages.validateMessagesAndAssignOffsets(offsetCounter = new LongRef(0), - now = System.currentTimeMillis(), - sourceCodec = DefaultCompressionCodec, - targetCodec = DefaultCompressionCodec, - messageFormatVersion = 1, - messageTimestampType = TimestampType.CREATE_TIME, - messageTimestampDiffMaxMs = 1000L) - fail("Should throw InvalidMessageException.") - } catch { - case _: InvalidTimestampException => - } - } - - @Test - def testAbsoluteOffsetAssignment() { - val messages = getMessages(magicValue = Message.MagicValue_V0, codec = NoCompressionCodec) - val compressedMessages = getMessages(magicValue = Message.MagicValue_V0, codec = DefaultCompressionCodec) - // check uncompressed offsets - checkOffsets(messages, 0) - val offset = 1234567 - checkOffsets(messages.validateMessagesAndAssignOffsets(offsetCounter = new LongRef(offset), - now = System.currentTimeMillis(), - sourceCodec = NoCompressionCodec, - targetCodec = NoCompressionCodec, - messageFormatVersion = 0, - messageTimestampType = TimestampType.CREATE_TIME, - messageTimestampDiffMaxMs = 1000L).validatedMessages, offset) - - // check compressed messages - checkOffsets(compressedMessages, 0) - checkOffsets(compressedMessages.validateMessagesAndAssignOffsets(offsetCounter = new LongRef(offset), - now = System.currentTimeMillis(), - sourceCodec = DefaultCompressionCodec, - targetCodec = DefaultCompressionCodec, - messageFormatVersion = 0, - messageTimestampType = TimestampType.CREATE_TIME, - messageTimestampDiffMaxMs = 1000L).validatedMessages, offset) - - } - - @Test - def testRelativeOffsetAssignment() { - val now = System.currentTimeMillis() - val messages = getMessages(magicValue = Message.MagicValue_V1, timestamp = now, codec = NoCompressionCodec) - val compressedMessages = getMessages(magicValue = Message.MagicValue_V1, timestamp = now, codec = DefaultCompressionCodec) - - // check uncompressed offsets - checkOffsets(messages, 0) - val offset = 1234567 - val messageWithOffset = messages.validateMessagesAndAssignOffsets(offsetCounter = new LongRef(offset), - now = System.currentTimeMillis(), - sourceCodec = NoCompressionCodec, - targetCodec = NoCompressionCodec, - messageTimestampType = TimestampType.CREATE_TIME, - messageTimestampDiffMaxMs = 5000L).validatedMessages - checkOffsets(messageWithOffset, offset) - - // check compressed messages - checkOffsets(compressedMessages, 0) - val compressedMessagesWithOffset = compressedMessages.validateMessagesAndAssignOffsets(offsetCounter = new LongRef(offset), - now = System.currentTimeMillis(), - sourceCodec = DefaultCompressionCodec, - targetCodec = DefaultCompressionCodec, - messageTimestampType = TimestampType.CREATE_TIME, - messageTimestampDiffMaxMs = 5000L).validatedMessages - checkOffsets(compressedMessagesWithOffset, offset) - } - - @Test(expected = classOf[InvalidMessageException]) - def testInvalidInnerMagicVersion(): Unit = { - val offset = 1234567 - val messages = messageSetWithInvalidInnerMagic(SnappyCompressionCodec, offset) - messages.validateMessagesAndAssignOffsets(offsetCounter = new LongRef(offset), - now = System.currentTimeMillis(), - sourceCodec = SnappyCompressionCodec, - targetCodec = SnappyCompressionCodec, - messageTimestampType = TimestampType.CREATE_TIME, - messageTimestampDiffMaxMs = 5000L).validatedMessages - } - - - @Test - def testOffsetAssignmentAfterMessageFormatConversion() { - // Check up conversion - val messagesV0 = getMessages(magicValue = Message.MagicValue_V0, codec = NoCompressionCodec) - val compressedMessagesV0 = getMessages(magicValue = Message.MagicValue_V0, codec = DefaultCompressionCodec) - // check uncompressed offsets - checkOffsets(messagesV0, 0) - val offset = 1234567 - checkOffsets(messagesV0.validateMessagesAndAssignOffsets(offsetCounter = new LongRef(offset), - now = System.currentTimeMillis(), - sourceCodec = NoCompressionCodec, - targetCodec = NoCompressionCodec, - messageFormatVersion = 1, - messageTimestampType = TimestampType.LOG_APPEND_TIME, - messageTimestampDiffMaxMs = 1000L).validatedMessages, offset) - - // check compressed messages - checkOffsets(compressedMessagesV0, 0) - checkOffsets(compressedMessagesV0.validateMessagesAndAssignOffsets(offsetCounter = new LongRef(offset), - now = System.currentTimeMillis(), - sourceCodec = DefaultCompressionCodec, - targetCodec = DefaultCompressionCodec, - messageFormatVersion = 1, - messageTimestampType = TimestampType.LOG_APPEND_TIME, - messageTimestampDiffMaxMs = 1000L).validatedMessages, offset) - - // Check down conversion - val now = System.currentTimeMillis() - val messagesV1 = getMessages(Message.MagicValue_V1, now, NoCompressionCodec) - val compressedMessagesV1 = getMessages(Message.MagicValue_V1, now, DefaultCompressionCodec) - - // check uncompressed offsets - checkOffsets(messagesV1, 0) - checkOffsets(messagesV1.validateMessagesAndAssignOffsets(offsetCounter = new LongRef(offset), - now = System.currentTimeMillis(), - sourceCodec = NoCompressionCodec, - targetCodec = NoCompressionCodec, - messageFormatVersion = 0, - messageTimestampType = TimestampType.CREATE_TIME, - messageTimestampDiffMaxMs = 5000L).validatedMessages, offset) - - // check compressed messages - checkOffsets(compressedMessagesV1, 0) - checkOffsets(compressedMessagesV1.validateMessagesAndAssignOffsets(offsetCounter = new LongRef(offset), - now = System.currentTimeMillis(), - sourceCodec = DefaultCompressionCodec, - targetCodec = DefaultCompressionCodec, - messageFormatVersion = 0, - messageTimestampType = TimestampType.CREATE_TIME, - messageTimestampDiffMaxMs = 5000L).validatedMessages, offset) - } - - @Test - def testWriteFullyTo() { - checkWriteFullyToWithMessageSet(createMessageSet(Array[Message]())) - checkWriteFullyToWithMessageSet(createMessageSet(messages)) - } - - def checkWriteFullyToWithMessageSet(messageSet: ByteBufferMessageSet) { - checkWriteWithMessageSet(messageSet, messageSet.writeFullyTo) - } - /* check that offsets are assigned based on byte offset from the given base offset */ def checkOffsets(messages: ByteBufferMessageSet, baseOffset: Long) { assertTrue("Message set should not be empty", messages.nonEmpty) @@ -457,59 +164,4 @@ class ByteBufferMessageSetTest extends BaseMessageSetTestCases { assertTrue(shallowOffsets.subsetOf(deepOffsets)) } - private def getMessages(magicValue: Byte = Message.CurrentMagicValue, - timestamp: Long = Message.NoTimestamp, - codec: CompressionCodec = NoCompressionCodec): ByteBufferMessageSet = { - if (magicValue == Message.MagicValue_V0) { - new ByteBufferMessageSet( - codec, - new Message("hello".getBytes, Message.NoTimestamp, Message.MagicValue_V0), - new Message("there".getBytes, Message.NoTimestamp, Message.MagicValue_V0), - new Message("beautiful".getBytes, Message.NoTimestamp, Message.MagicValue_V0)) - } else { - new ByteBufferMessageSet( - codec, - new Message("hello".getBytes, timestamp = timestamp, magicValue = Message.MagicValue_V1), - new Message("there".getBytes, timestamp = timestamp, magicValue = Message.MagicValue_V1), - new Message("beautiful".getBytes, timestamp = timestamp, magicValue = Message.MagicValue_V1)) - } - } - - private def messageSetWithInvalidInnerMagic(codec: CompressionCodec, - initialOffset: Long): ByteBufferMessageSet = { - val messages = (0 until 20).map(id => - new Message(key = id.toString.getBytes, - bytes = id.toString.getBytes, - timestamp = Message.NoTimestamp, - magicValue = Message.MagicValue_V0)) - - val messageWriter = new MessageWriter(math.min(math.max(MessageSet.messageSetSize(messages) / 2, 1024), 1 << 16)) - var lastOffset = initialOffset - - messageWriter.write( - codec = codec, - timestamp = System.currentTimeMillis(), - timestampType = TimestampType.CREATE_TIME, - magicValue = Message.MagicValue_V1) { outputStream => - - val output = new DataOutputStream(CompressionFactory(codec, Message.MagicValue_V1, outputStream)) - try { - for (message <- messages) { - val innerOffset = lastOffset - initialOffset - output.writeLong(innerOffset) - output.writeInt(message.size) - output.write(message.buffer.array, message.buffer.arrayOffset, message.buffer.limit) - lastOffset += 1 - } - } finally { - output.close() - } - } - val buffer = ByteBuffer.allocate(messageWriter.size + MessageSet.LogOverhead) - ByteBufferMessageSet.writeMessage(buffer, messageWriter, lastOffset - 1) - buffer.rewind() - - new ByteBufferMessageSet(buffer) - } - } http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1e5b9/core/src/test/scala/unit/kafka/message/MessageCompressionTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/message/MessageCompressionTest.scala b/core/src/test/scala/unit/kafka/message/MessageCompressionTest.scala index e8abfe1..5d2c8fb 100644 --- a/core/src/test/scala/unit/kafka/message/MessageCompressionTest.scala +++ b/core/src/test/scala/unit/kafka/message/MessageCompressionTest.scala @@ -70,7 +70,7 @@ class MessageCompressionTest extends JUnitSuite { testCompressSize(GZIPCompressionCodec, messages, 396) if(isSnappyAvailable) - testCompressSize(SnappyCompressionCodec, messages, 502) + testCompressSize(SnappyCompressionCodec, messages, 1063) if(isLZ4Available) testCompressSize(LZ4CompressionCodec, messages, 387) http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1e5b9/core/src/test/scala/unit/kafka/message/MessageTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/message/MessageTest.scala b/core/src/test/scala/unit/kafka/message/MessageTest.scala index 5c02125..46c25af 100755 --- a/core/src/test/scala/unit/kafka/message/MessageTest.scala +++ b/core/src/test/scala/unit/kafka/message/MessageTest.scala @@ -48,7 +48,7 @@ class MessageTest extends JUnitSuite { val magicValues = Array(Message.MagicValue_V0, Message.MagicValue_V1) for(k <- keys; v <- vals; codec <- codecs; t <- timestamps; mv <- magicValues) { val timestamp = ensureValid(mv, t) - messages += new MessageTestVal(k, v, codec, timestamp, mv, new Message(v, k, timestamp, codec, mv)) + messages += MessageTestVal(k, v, codec, timestamp, mv, new Message(v, k, timestamp, codec, mv)) } def ensureValid(magicValue: Byte, timestamp: Long): Long = @@ -96,7 +96,7 @@ class MessageTest extends JUnitSuite { @Test def testEquality() { - for(v <- messages) { + for (v <- messages) { assertFalse("Should not equal null", v.message.equals(null)) assertFalse("Should not equal a random string", v.message.equals("asdf")) assertTrue("Should equal itself", v.message.equals(v.message)) @@ -105,40 +105,6 @@ class MessageTest extends JUnitSuite { } } - @Test - def testMessageFormatConversion() { - - def convertAndVerify(v: MessageTestVal, fromMessageFormat: Byte, toMessageFormat: Byte) { - assertEquals("Message should be the same when convert to the same version.", - v.message.toFormatVersion(fromMessageFormat), v.message) - val convertedMessage = v.message.toFormatVersion(toMessageFormat) - assertEquals("Size difference is not expected value", convertedMessage.size - v.message.size, - Message.headerSizeDiff(fromMessageFormat, toMessageFormat)) - assertTrue("Message should still be valid", convertedMessage.isValid) - assertEquals("Timestamp should be NoTimestamp", convertedMessage.timestamp, Message.NoTimestamp) - assertEquals(s"Magic value should be $toMessageFormat now", convertedMessage.magic, toMessageFormat) - if (convertedMessage.hasKey) - assertEquals("Message key should not change", convertedMessage.key, ByteBuffer.wrap(v.key)) - else - assertNull(convertedMessage.key) - if(v.payload == null) { - assertTrue(convertedMessage.isNull) - assertEquals("Payload should be null", null, convertedMessage.payload) - } else { - assertEquals("Message payload should not change", convertedMessage.payload, ByteBuffer.wrap(v.payload)) - } - assertEquals("Compression codec should not change", convertedMessage.compressionCodec, v.codec) - } - - for (v <- messages) { - if (v.magicValue == Message.MagicValue_V0) { - convertAndVerify(v, Message.MagicValue_V0, Message.MagicValue_V1) - } else if (v.magicValue == Message.MagicValue_V1) { - convertAndVerify(v, Message.MagicValue_V1, Message.MagicValue_V0) - } - } - } - @Test(expected = classOf[IllegalArgumentException]) def testInvalidTimestampAndMagicValueCombination() { new Message("hello".getBytes, 0L, Message.MagicValue_V0) http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1e5b9/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala b/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala index 7d6ad91..5c9f035 100644 --- a/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala +++ b/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala @@ -19,17 +19,17 @@ package kafka.server import com.yammer.metrics.Metrics import kafka.cluster.BrokerEndPoint -import kafka.message.{ByteBufferMessageSet, Message, NoCompressionCodec} import kafka.server.AbstractFetcherThread.{FetchRequest, PartitionData} import kafka.utils.TestUtils import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.utils.Utils +import org.apache.kafka.common.record.{MemoryRecords, Record} import org.junit.Assert.{assertFalse, assertTrue} import org.junit.{Before, Test} import scala.collection.JavaConverters._ -import scala.collection.{mutable, Map} +import scala.collection.{Map, mutable} class AbstractFetcherThreadTest { @@ -91,10 +91,10 @@ class AbstractFetcherThreadTest { override def offset(topicAndPartition: TopicPartition): Long = offsets(topicAndPartition) } - class TestPartitionData(byteBufferMessageSet: ByteBufferMessageSet) extends PartitionData { + class TestPartitionData(records: MemoryRecords = MemoryRecords.EMPTY) extends PartitionData { override def errorCode: Short = Errors.NONE.code - override def toByteBufferMessageSet: ByteBufferMessageSet = byteBufferMessageSet + override def toRecords: MemoryRecords = records override def highWatermark: Long = 0L @@ -119,7 +119,7 @@ class AbstractFetcherThreadTest { override def handlePartitionsWithErrors(partitions: Iterable[TopicPartition]): Unit = {} override protected def fetch(fetchRequest: DummyFetchRequest): Seq[(TopicPartition, TestPartitionData)] = - fetchRequest.offsets.mapValues(_ => new TestPartitionData(new ByteBufferMessageSet())).toSeq + fetchRequest.offsets.mapValues(_ => new TestPartitionData()).toSeq override protected def buildFetchRequest(partitionMap: collection.Seq[(TopicPartition, PartitionFetchState)]): DummyFetchRequest = new DummyFetchRequest(partitionMap.map { case (k, v) => (k, v.offset) }.toMap) @@ -156,8 +156,8 @@ class AbstractFetcherThreadTest { @volatile var fetchCount = 0 private val normalPartitionDataSet = List( - new TestPartitionData(new ByteBufferMessageSet(NoCompressionCodec, Seq(0L), new Message("hello".getBytes))), - new TestPartitionData(new ByteBufferMessageSet(NoCompressionCodec, Seq(1L), new Message("hello".getBytes))) + new TestPartitionData(MemoryRecords.withRecords(0L, Record.create("hello".getBytes()))), + new TestPartitionData(MemoryRecords.withRecords(1L, Record.create("hello".getBytes()))) ) override def processPartitionData(topicAndPartition: TopicPartition, @@ -170,10 +170,10 @@ class AbstractFetcherThreadTest { .format(topicAndPartition, fetchOffset, logEndOffset)) // Now check message's crc - val messages = partitionData.toByteBufferMessageSet - for (messageAndOffset <- messages.shallowIterator) { - messageAndOffset.message.ensureValid() - logEndOffset = messageAndOffset.nextOffset + val records = partitionData.toRecords + for (entry <- records.shallowIterator.asScala) { + entry.record.ensureValid() + logEndOffset = entry.nextOffset } } @@ -181,12 +181,12 @@ class AbstractFetcherThreadTest { fetchCount += 1 // Set the first fetch to get a corrupted message if (fetchCount == 1) { - val corruptedMessage = new Message("hello".getBytes) - val badChecksum = (corruptedMessage.checksum + 1 % Int.MaxValue).toInt + val corruptedRecord = Record.create("hello".getBytes()) + val badChecksum = (corruptedRecord.checksum + 1 % Int.MaxValue).toInt // Garble checksum - Utils.writeUnsignedInt(corruptedMessage.buffer, Message.CrcOffset, badChecksum) - val byteBufferMessageSet = new ByteBufferMessageSet(NoCompressionCodec, corruptedMessage) - fetchRequest.offsets.mapValues(_ => new TestPartitionData(byteBufferMessageSet)).toSeq + Utils.writeUnsignedInt(corruptedRecord.buffer, Record.CRC_OFFSET, badChecksum) + val records = MemoryRecords.withRecords(corruptedRecord) + fetchRequest.offsets.mapValues(_ => new TestPartitionData(records)).toSeq } else // Then, the following fetches get the normal data fetchRequest.offsets.mapValues(v => normalPartitionDataSet(v.toInt)).toSeq
