Updated Branches: refs/heads/0.8 3696d7281 -> de1a4d727
KAFKA-698 Avoid advancing the log end offset until the append has actually happened since reads may be happening in the meantime. Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/de1a4d72 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/de1a4d72 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/de1a4d72 Branch: refs/heads/0.8 Commit: de1a4d727693c39be19fd9db427746fa6c8a4a12 Parents: 3696d72 Author: Jay Kreps <jay.kr...@gmail.com> Authored: Mon Jan 14 09:29:52 2013 -0800 Committer: Jay Kreps <jay.kr...@gmail.com> Committed: Tue Jan 15 15:43:10 2013 -0800 ---------------------------------------------------------------------- core/src/main/scala/kafka/log/Log.scala | 11 +++++++---- 1 files changed, 7 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/de1a4d72/core/src/main/scala/kafka/log/Log.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index 79db610..560be19 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -269,14 +269,14 @@ private[kafka] class Log(val dir: File, // assign offsets to the messageset val offsets = if(assignOffsets) { - val firstOffset = nextOffset.get - validMessages = validMessages.assignOffsets(nextOffset, messageSetInfo.codec) - val lastOffset = nextOffset.get - 1 + val offsetCounter = new AtomicLong(nextOffset.get) + val firstOffset = offsetCounter.get + validMessages = validMessages.assignOffsets(offsetCounter, messageSetInfo.codec) + val lastOffset = offsetCounter.get - 1 (firstOffset, lastOffset) } else { if(!messageSetInfo.offsetsMonotonic) throw new IllegalArgumentException("Out of order offsets found in " + messages) - nextOffset.set(messageSetInfo.lastOffset + 1) (messageSetInfo.firstOffset, messageSetInfo.lastOffset) } @@ -285,6 +285,9 @@ private[kafka] class Log(val dir: File, .format(this.name, offsets._1, nextOffset.get(), validMessages)) segment.append(offsets._1, validMessages) + // advance the log end offset + nextOffset.set(offsets._2 + 1) + // return the offset at which the messages were appended offsets }