Repository: kafka Updated Branches: refs/heads/trunk 693d4ca1a -> f1110c3fb
KAFKA-2477: Fix a race condition between log append and fetch that causes OffsetOutOfRangeException. Tried two fixes. I prefer the second approach because it saves an additional offset search. Author: Jiangjie Qin <becket....@gmail.com> Reviewers: Guozhang Wang <wangg...@gmail.com>, Ismael Juma <ism...@juma.me.uk>, Jun Rao <jun...@gmail.com> Closes #204 from becketqin/KAFKA-2477 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/f1110c3f Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/f1110c3f Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/f1110c3f Branch: refs/heads/trunk Commit: f1110c3fbb166f94204b6bb18bc4e1a9100d3c4e Parents: 693d4ca Author: Jiangjie Qin <becket....@gmail.com> Authored: Wed Oct 7 21:59:14 2015 -0700 Committer: Jun Rao <jun...@gmail.com> Committed: Wed Oct 7 21:59:14 2015 -0700 ---------------------------------------------------------------------- core/src/main/scala/kafka/log/Log.scala | 39 ++++++++++++++++----- core/src/main/scala/kafka/log/LogSegment.scala | 9 ++--- 2 files changed, 35 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/f1110c3f/core/src/main/scala/kafka/log/Log.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index e5e8007..02205c9 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -470,22 +470,41 @@ class Log(val dir: File, def read(startOffset: Long, maxLength: Int, maxOffset: Option[Long] = None): FetchDataInfo = { trace("Reading %d bytes from offset %d in log %s of length %d bytes".format(maxLength, startOffset, name, size)) - // check if the offset is valid and in range - val next = nextOffsetMetadata.messageOffset + // Because we don't use lock for reading, the synchronization is a little bit tricky. + // We create the local variables to avoid race conditions with updates to the log. + val currentNextOffsetMetadata = nextOffsetMetadata + val next = currentNextOffsetMetadata.messageOffset if(startOffset == next) - return FetchDataInfo(nextOffsetMetadata, MessageSet.Empty) - + return FetchDataInfo(currentNextOffsetMetadata, MessageSet.Empty) + var entry = segments.floorEntry(startOffset) - + // attempt to read beyond the log end offset is an error if(startOffset > next || entry == null) throw new OffsetOutOfRangeException("Request for offset %d but we only have log segments in the range %d to %d.".format(startOffset, segments.firstKey, next)) - // do the read on the segment with a base offset less than the target offset + // Do the read on the segment with a base offset less than the target offset // but if that segment doesn't contain any messages with an offset greater than that // continue to read from successive segments until we get some messages or we reach the end of the log while(entry != null) { - val fetchInfo = entry.getValue.read(startOffset, maxOffset, maxLength) + // If the fetch occurs on the active segment, there might be a race condition where two fetch requests occur after + // the message is appended but before the nextOffsetMetadata is updated. In that case the second fetch may + // cause OffsetOutOfRangeException. To solve that, we cap the reading up to exposed position instead of the log + // end of the active segment. + val maxPosition = { + if (entry == segments.lastEntry) { + val exposedPos = nextOffsetMetadata.relativePositionInSegment.toLong + // Check the segment again in case a new segment has just rolled out. + if (entry != segments.lastEntry) + // New log segment has rolled out, we can read up to the file end. + entry.getValue.size + else + exposedPos + } else { + entry.getValue.size + } + } + val fetchInfo = entry.getValue.read(startOffset, maxOffset, maxLength, maxPosition) if(fetchInfo == null) { entry = segments.higherEntry(entry.getKey) } else { @@ -622,12 +641,14 @@ class Log(val dir: File, val prev = addSegment(segment) if(prev != null) throw new KafkaException("Trying to roll a new log segment for topic partition %s with start offset %d while it already exists.".format(name, newOffset)) - + // We need to update the segment base offset and append position data of the metadata when log rolls. + // The next offset should not change. + updateLogEndOffset(nextOffsetMetadata.messageOffset) // schedule an asynchronous flush of the old segment scheduler.schedule("flush-log", () => flush(newOffset), delay = 0L) info("Rolled new log segment for '" + name + "' in %.0f ms.".format((System.nanoTime - start) / (1000.0*1000.0))) - + segment } } http://git-wip-us.apache.org/repos/asf/kafka/blob/f1110c3f/core/src/main/scala/kafka/log/LogSegment.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/log/LogSegment.scala b/core/src/main/scala/kafka/log/LogSegment.scala index 1377e8f..4de4c2b 100755 --- a/core/src/main/scala/kafka/log/LogSegment.scala +++ b/core/src/main/scala/kafka/log/LogSegment.scala @@ -112,12 +112,13 @@ class LogSegment(val log: FileMessageSet, * @param startOffset A lower bound on the first offset to include in the message set we read * @param maxSize The maximum number of bytes to include in the message set we read * @param maxOffset An optional maximum offset for the message set we read + * @param maxPosition An optional maximum position in the log segment that should be exposed for read. * * @return The fetched data and the offset metadata of the first message whose offset is >= startOffset, * or null if the startOffset is larger than the largest offset in this log */ @threadsafe - def read(startOffset: Long, maxOffset: Option[Long], maxSize: Int): FetchDataInfo = { + def read(startOffset: Long, maxOffset: Option[Long], maxSize: Int, maxPosition: Long = size): FetchDataInfo = { if(maxSize < 0) throw new IllegalArgumentException("Invalid max size for log read (%d)".format(maxSize)) @@ -138,8 +139,8 @@ class LogSegment(val log: FileMessageSet, val length = maxOffset match { case None => - // no max offset, just use the max size they gave unmolested - maxSize + // no max offset, just read until the max position + min((maxPosition - startPosition.position).toInt, maxSize) case Some(offset) => { // there is a max offset, translate it to a file position and use that to calculate the max read size if(offset < startOffset) @@ -150,7 +151,7 @@ class LogSegment(val log: FileMessageSet, logSize // the max offset is off the end of the log, use the end of the file else mapping.position - min(endPosition - startPosition.position, maxSize) + min(min(maxPosition, endPosition) - startPosition.position, maxSize).toInt } } FetchDataInfo(offsetMetadata, log.read(startPosition.position, length))