Repository: kafka Updated Branches: refs/heads/0.8.2 5fcc3747f -> db0d76b5d
kafka-1670; Corrupt log files for segment.bytes values close to Int.MaxInt; patched by Sriharsha Chintalapani; reviewed by Jay Kreps and Jun Rao Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/db0d76b5 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/db0d76b5 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/db0d76b5 Branch: refs/heads/0.8.2 Commit: db0d76b5d1facbac5deab8a9941e6439bc30a2e3 Parents: 5fcc374 Author: Sriharsha Chintalapani <[email protected]> Authored: Thu Oct 9 08:08:20 2014 -0700 Committer: Jun Rao <[email protected]> Committed: Thu Oct 9 08:08:20 2014 -0700 ---------------------------------------------------------------------- .../errors/RecordBatchTooLargeException.java | 39 ++++++++++++++++++++ .../apache/kafka/common/protocol/Errors.java | 3 +- .../main/scala/kafka/common/ErrorMapping.scala | 5 ++- .../MessageSetSizeTooLargeException.scala | 22 +++++++++++ core/src/main/scala/kafka/log/Log.scala | 27 +++++++++++--- .../scala/unit/kafka/log/LogManagerTest.scala | 2 +- .../src/test/scala/unit/kafka/log/LogTest.scala | 34 ++++++++++++----- .../scala/unit/kafka/server/LogOffsetTest.scala | 8 ++-- 8 files changed, 117 insertions(+), 23 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/db0d76b5/clients/src/main/java/org/apache/kafka/common/errors/RecordBatchTooLargeException.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/errors/RecordBatchTooLargeException.java b/clients/src/main/java/org/apache/kafka/common/errors/RecordBatchTooLargeException.java new file mode 100644 index 0000000..f3f3f27 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/errors/RecordBatchTooLargeException.java @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package org.apache.kafka.common.errors; + +/** + * This record batch is larger than the maximum allowable size + */ +public class RecordBatchTooLargeException extends ApiException { + + private static final long serialVersionUID = 1L; + + public RecordBatchTooLargeException() { + super(); + } + + public RecordBatchTooLargeException(String message, Throwable cause) { + super(message, cause); + } + + public RecordBatchTooLargeException(String message) { + super(message); + } + + public RecordBatchTooLargeException(Throwable cause) { + super(cause); + } + +} + http://git-wip-us.apache.org/repos/asf/kafka/blob/db0d76b5/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java index d434f42..d5f5de3 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java @@ -43,7 +43,8 @@ public enum Errors { OFFSET_METADATA_TOO_LARGE(12, new OffsetMetadataTooLarge("The metadata field of the offset request was too large.")), NETWORK_EXCEPTION(13, new NetworkException("The server disconnected before a response was received.")), // TODO: errorCode 14, 15, 16 - INVALID_TOPIC_EXCEPTION(17, new InvalidTopicException("The request attempted to perform an operation on an invalid topic.")); + INVALID_TOPIC_EXCEPTION(17, new InvalidTopicException("The request attempted to perform an operation on an invalid topic.")), + RECORD_LIST_TOO_LARGE(18, new RecordBatchTooLargeException("The request included message batch larger than the configured segment size on the server.")); private static Map<Class<?>, Errors> classToError = new HashMap<Class<?>, Errors>(); private static Map<Short, Errors> codeToError = new HashMap<Short, Errors>(); http://git-wip-us.apache.org/repos/asf/kafka/blob/db0d76b5/core/src/main/scala/kafka/common/ErrorMapping.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/common/ErrorMapping.scala b/core/src/main/scala/kafka/common/ErrorMapping.scala index 3fae791..a190607 100644 --- a/core/src/main/scala/kafka/common/ErrorMapping.scala +++ b/core/src/main/scala/kafka/common/ErrorMapping.scala @@ -19,7 +19,6 @@ package kafka.common import kafka.message.InvalidMessageException import java.nio.ByteBuffer -import java.lang.Throwable import scala.Predef._ /** @@ -47,6 +46,7 @@ object ErrorMapping { val ConsumerCoordinatorNotAvailableCode: Short = 15 val NotCoordinatorForConsumerCode: Short = 16 val InvalidTopicCode : Short = 17 + val MessageSetSizeTooLargeCode: Short = 18 private val exceptionToCode = Map[Class[Throwable], Short]( @@ -65,7 +65,8 @@ object ErrorMapping { classOf[OffsetsLoadInProgressException].asInstanceOf[Class[Throwable]] -> OffsetsLoadInProgressCode, classOf[ConsumerCoordinatorNotAvailableException].asInstanceOf[Class[Throwable]] -> ConsumerCoordinatorNotAvailableCode, classOf[NotCoordinatorForConsumerException].asInstanceOf[Class[Throwable]] -> NotCoordinatorForConsumerCode, - classOf[InvalidTopicException].asInstanceOf[Class[Throwable]] -> InvalidTopicCode + classOf[InvalidTopicException].asInstanceOf[Class[Throwable]] -> InvalidTopicCode, + classOf[MessageSetSizeTooLargeException].asInstanceOf[Class[Throwable]] -> MessageSetSizeTooLargeCode ).withDefaultValue(UnknownCode) /* invert the mapping */ http://git-wip-us.apache.org/repos/asf/kafka/blob/db0d76b5/core/src/main/scala/kafka/common/MessageSetSizeTooLargeException.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/common/MessageSetSizeTooLargeException.scala b/core/src/main/scala/kafka/common/MessageSetSizeTooLargeException.scala new file mode 100644 index 0000000..94a616e --- /dev/null +++ b/core/src/main/scala/kafka/common/MessageSetSizeTooLargeException.scala @@ -0,0 +1,22 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.common + +class MessageSetSizeTooLargeException(message: String) extends RuntimeException(message) { + def this() = this(null) +} http://git-wip-us.apache.org/repos/asf/kafka/blob/db0d76b5/core/src/main/scala/kafka/log/Log.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index 0ddf97b..a123cdc 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -252,9 +252,6 @@ class Log(val dir: File, lock synchronized { appendInfo.firstOffset = nextOffsetMetadata.messageOffset - // maybe roll the log if this segment is full - val segment = maybeRoll() - if(assignOffsets) { // assign offsets to the message set val offset = new AtomicLong(nextOffsetMetadata.messageOffset) @@ -282,6 +279,16 @@ class Log(val dir: File, } } + // check messages set size may be exceed config.segmentSize + if(validMessages.sizeInBytes > config.segmentSize) { + throw new MessageSetSizeTooLargeException("Message set size is %d bytes which exceeds the maximum configured segment size of %d." + .format(validMessages.sizeInBytes, config.segmentSize)) + } + + + // maybe roll the log if this segment is full + val segment = maybeRoll(validMessages.sizeInBytes) + // now append to the log segment.append(appendInfo.firstOffset, validMessages) @@ -489,12 +496,20 @@ class Log(val dir: File, def logEndOffset: Long = nextOffsetMetadata.messageOffset /** - * Roll the log over to a new empty log segment if necessary + * Roll the log over to a new empty log segment if necessary. + * + * @param messagesSize The messages set size in bytes + * logSegment will be rolled if one of the following conditions met + * <ol> + * <li> The logSegment is full + * <li> The maxTime has elapsed + * <li> The index is full + * </ol> * @return The currently active segment after (perhaps) rolling to a new segment */ - private def maybeRoll(): LogSegment = { + private def maybeRoll(messagesSize: Int): LogSegment = { val segment = activeSegment - if (segment.size > config.segmentSize || + if (segment.size > config.segmentSize - messagesSize || segment.size > 0 && time.milliseconds - segment.created > config.segmentMs || segment.index.isFull) { debug("Rolling new log segment in %s (log_size = %d/%d, index_size = %d/%d, age_ms = %d/%d)." http://git-wip-us.apache.org/repos/asf/kafka/blob/db0d76b5/core/src/test/scala/unit/kafka/log/LogManagerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala index 59bd8a9..90cd530 100644 --- a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala @@ -114,7 +114,7 @@ class LogManagerTest extends JUnit3Suite { val setSize = TestUtils.singleMessageSet("test".getBytes()).sizeInBytes logManager.shutdown() - val config = logConfig.copy(segmentSize = 10 * (setSize - 1), retentionSize = 5L * 10L * setSize + 10L) + val config = logConfig.copy(segmentSize = 10 * setSize, retentionSize = 5L * 10L * setSize + 10L) logManager = createLogManager() logManager.startup http://git-wip-us.apache.org/repos/asf/kafka/blob/db0d76b5/core/src/test/scala/unit/kafka/log/LogTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala index 577d102..a0cbd3b 100644 --- a/core/src/test/scala/unit/kafka/log/LogTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogTest.scala @@ -18,15 +18,13 @@ package kafka.log import java.io._ -import java.util.ArrayList import java.util.concurrent.atomic._ import junit.framework.Assert._ import org.scalatest.junit.JUnitSuite import org.junit.{After, Before, Test} import kafka.message._ -import kafka.common.{MessageSizeTooLargeException, OffsetOutOfRangeException} +import kafka.common.{MessageSizeTooLargeException, OffsetOutOfRangeException, MessageSetSizeTooLargeException} import kafka.utils._ -import scala.Some import kafka.server.KafkaConfig class LogTest extends JUnitSuite { @@ -239,7 +237,7 @@ class LogTest extends JUnitSuite { @Test def testCompressedMessages() { /* this log should roll after every messageset */ - val log = new Log(logDir, logConfig.copy(segmentSize = 10), recoveryPoint = 0L, time.scheduler, time = time) + val log = new Log(logDir, logConfig.copy(segmentSize = 100), recoveryPoint = 0L, time.scheduler, time = time) /* append 2 compressed message sets, each with two messages giving offsets 0, 1, 2, 3 */ log.append(new ByteBufferMessageSet(DefaultCompressionCodec, new Message("hello".getBytes), new Message("there".getBytes))) @@ -286,7 +284,26 @@ class LogTest extends JUnitSuite { } /** - * We have a max size limit on message appends, check that it is properly enforced by appending a message larger than the + * MessageSet size shouldn't exceed the config.segmentSize, check that it is properly enforced by + * appending a message set larger than the config.segmentSize setting and checking that an exception is thrown. + */ + @Test + def testMessageSetSizeCheck() { + val messageSet = new ByteBufferMessageSet(NoCompressionCodec, new Message ("You".getBytes), new Message("bethe".getBytes)) + // append messages to log + val configSegmentSize = messageSet.sizeInBytes - 1 + val log = new Log(logDir, logConfig.copy(segmentSize = configSegmentSize), recoveryPoint = 0L, time.scheduler, time = time) + + try { + log.append(messageSet) + fail("message set should throw MessageSetSizeTooLargeException.") + } catch { + case e: MessageSetSizeTooLargeException => // this is good + } + } + + /** + * We have a max size limit on message appends, check that it is properly enforced by appending a message larger than the * setting and checking that an exception is thrown. */ @Test @@ -305,10 +322,9 @@ class LogTest extends JUnitSuite { log.append(second) fail("Second message set should throw MessageSizeTooLargeException.") } catch { - case e: MessageSizeTooLargeException => // this is good + case e: MessageSizeTooLargeException => // this is good } } - /** * Append a bunch of messages to a log and then re-open it both with and without recovery and check that the log re-initializes correctly. */ @@ -375,7 +391,7 @@ class LogTest extends JUnitSuite { val set = TestUtils.singleMessageSet("test".getBytes()) val setSize = set.sizeInBytes val msgPerSeg = 10 - val segmentSize = msgPerSeg * (setSize - 1) // each segment will be 10 messages + val segmentSize = msgPerSeg * setSize // each segment will be 10 messages // create a log val log = new Log(logDir, logConfig.copy(segmentSize = segmentSize), recoveryPoint = 0L, scheduler = time.scheduler, time = time) @@ -429,7 +445,7 @@ class LogTest extends JUnitSuite { val set = TestUtils.singleMessageSet("test".getBytes()) val setSize = set.sizeInBytes val msgPerSeg = 10 - val segmentSize = msgPerSeg * (setSize - 1) // each segment will be 10 messages + val segmentSize = msgPerSeg * setSize // each segment will be 10 messages val config = logConfig.copy(segmentSize = segmentSize) val log = new Log(logDir, config, recoveryPoint = 0L, scheduler = time.scheduler, time = time) assertEquals("There should be exactly 1 segment.", 1, log.numberOfSegments) http://git-wip-us.apache.org/repos/asf/kafka/blob/db0d76b5/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala index 9556ed9..c06ee75 100644 --- a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala +++ b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala @@ -92,7 +92,7 @@ class LogOffsetTest extends JUnit3Suite with ZooKeeperTestHarness { log.flush() val offsets = server.apis.fetchOffsets(logManager, TopicAndPartition(topic, part), OffsetRequest.LatestTime, 10) - assertEquals(Seq(20L, 16L, 12L, 8L, 4L, 0L), offsets) + assertEquals(Seq(20L, 18L, 15L, 12L, 9L, 6L, 3L, 0), offsets) waitUntilTrue(() => isLeaderLocalOnBroker(topic, part, server), "Leader should be elected") val topicAndPartition = TopicAndPartition(topic, part) @@ -101,7 +101,7 @@ class LogOffsetTest extends JUnit3Suite with ZooKeeperTestHarness { replicaId = 0) val consumerOffsets = simpleConsumer.getOffsetsBefore(offsetRequest).partitionErrorAndOffsets(topicAndPartition).offsets - assertEquals(Seq(20L, 16L, 12L, 8L, 4L, 0L), consumerOffsets) + assertEquals(Seq(20L, 18L, 15L, 12L, 9L, 6L, 3L, 0), consumerOffsets) // try to fetch using latest offset val fetchResponse = simpleConsumer.fetch( @@ -155,14 +155,14 @@ class LogOffsetTest extends JUnit3Suite with ZooKeeperTestHarness { val now = time.milliseconds + 30000 // pretend it is the future to avoid race conditions with the fs val offsets = server.apis.fetchOffsets(logManager, TopicAndPartition(topic, part), now, 10) - assertEquals(Seq(20L, 16L, 12L, 8L, 4L, 0L), offsets) + assertEquals(Seq(20L, 18L, 15L, 12L, 9L, 6L, 3L, 0L), offsets) waitUntilTrue(() => isLeaderLocalOnBroker(topic, part, server), "Leader should be elected") val topicAndPartition = TopicAndPartition(topic, part) val offsetRequest = OffsetRequest(Map(topicAndPartition -> PartitionOffsetRequestInfo(now, 10)), replicaId = 0) val consumerOffsets = simpleConsumer.getOffsetsBefore(offsetRequest).partitionErrorAndOffsets(topicAndPartition).offsets - assertEquals(Seq(20L, 16L, 12L, 8L, 4L, 0L), consumerOffsets) + assertEquals(Seq(20L, 18L, 15L, 12L, 9L, 6L, 3L, 0L), consumerOffsets) } @Test
