Incorrect index in the log of a follower; patched by Jun Rao; reviewed by Neha Narkhede and Jay Kreps; kafka-804
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/46ebdc16 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/46ebdc16 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/46ebdc16 Branch: refs/heads/trunk Commit: 46ebdc16e99c4bb9236a6c25c47ff2c97fdc0f53 Parents: d40d255 Author: Jun Rao <jun...@gmail.com> Authored: Mon Mar 18 15:53:03 2013 -0700 Committer: Jun Rao <jun...@gmail.com> Committed: Mon Mar 18 15:53:03 2013 -0700 ---------------------------------------------------------------------- core/src/main/scala/kafka/log/Log.scala | 30 ++++++++-------- core/src/test/scala/unit/kafka/log/LogTest.scala | 8 ++-- 2 files changed, 19 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/46ebdc16/core/src/main/scala/kafka/log/Log.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index f6ee475..7d71451 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -270,33 +270,32 @@ private[kafka] class Log(val dir: File, try { // they are valid, insert them in the log - val offsetsAndNumAppendedMessages = lock synchronized { + val offsets = lock synchronized { val firstOffset = nextOffset.get // maybe roll the log if this segment is full val segment = maybeRoll(segments.view.last) // assign offsets to the messageset - val offsets = + val lastOffset = if(assignOffsets) { val offsetCounter = new AtomicLong(nextOffset.get) - val firstOffset = offsetCounter.get try { validMessages = validMessages.assignOffsets(offsetCounter, messageSetInfo.codec) } catch { case e: IOException => throw new KafkaException("Error in validating messages while appending to log '%s'".format(name), e) } - val lastOffset = offsetCounter.get - 1 - val numMessages = lastOffset - firstOffset + 1 + val assignedLastOffset = offsetCounter.get - 1 + val numMessages = assignedLastOffset - firstOffset + 1 BrokerTopicStats.getBrokerTopicStats(topicName).messagesInRate.mark(numMessages) BrokerTopicStats.getBrokerAllTopicsStats.messagesInRate.mark(numMessages) - (firstOffset, lastOffset) + assignedLastOffset } else { require(messageSetInfo.offsetsMonotonic, "Out of order offsets found in " + messages) require(messageSetInfo.firstOffset >= nextOffset.get, "Attempt to append a message set beginning with offset %d to a log with log end offset %d." .format(messageSetInfo.firstOffset, nextOffset.get)) - (messageSetInfo.firstOffset, messageSetInfo.lastOffset) + messageSetInfo.lastOffset } // Check if the message sizes are valid. This check is done after assigning offsets to ensure the comparison @@ -308,23 +307,24 @@ private[kafka] class Log(val dir: File, } // now append to the log - trace("Appending message set to %s offset: %d nextOffset: %d messageSet: %s" - .format(this.name, offsets._1, nextOffset.get(), validMessages)) - segment.append(offsets._1, validMessages) + segment.append(firstOffset, validMessages) // advance the log end offset - nextOffset.set(offsets._2 + 1) - val numAppendedMessages = (nextOffset.get - firstOffset).toInt + nextOffset.set(lastOffset + 1) + + trace("Appended message set to log %s with first offset: %d, next offset: %d, and messages: %s" + .format(this.name, firstOffset, nextOffset.get(), validMessages)) // return the offset at which the messages were appended - (offsets._1, offsets._2, numAppendedMessages) + (firstOffset, lastOffset) } // maybe flush the log and index - maybeFlush(offsetsAndNumAppendedMessages._3) + val numAppendedMessages = (offsets._2 - offsets._1 + 1).toInt + maybeFlush(numAppendedMessages) // return the first and last offset - (offsetsAndNumAppendedMessages._1, offsetsAndNumAppendedMessages._2) + offsets } catch { case e: IOException => throw new KafkaStorageException("I/O exception in append to log '%s'".format(name), e) } http://git-wip-us.apache.org/repos/asf/kafka/blob/46ebdc16/core/src/test/scala/unit/kafka/log/LogTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala index 786ae03..4ed88e8 100644 --- a/core/src/test/scala/unit/kafka/log/LogTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogTest.scala @@ -379,15 +379,15 @@ class LogTest extends JUnitSuite { needsRecovery = true) val messages = List("one", "two", "three", "four", "five", "six") val ms = new ByteBufferMessageSet(compressionCodec = codec, - offsetCounter = new AtomicLong(5), + offsetCounter = new AtomicLong(0), messages = messages.map(s => new Message(s.getBytes)):_*) - val firstOffset = ms.shallowIterator.toList.head.offset - val lastOffset = ms.shallowIterator.toList.last.offset + val firstOffset = ms.toList.head.offset + val lastOffset = ms.toList.last.offset val (first, last) = log.append(ms, assignOffsets = false) assertEquals(last + 1, log.logEndOffset) assertEquals(firstOffset, first) assertEquals(lastOffset, last) - assertTrue(log.read(5, 64*1024).size > 0) + assertTrue(log.read(0, 64*1024).size > 0) log.delete() } }