[ https://issues.apache.org/jira/browse/KAFKA-3323?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Michael Schiff updated KAFKA-3323: ---------------------------------- Attachment: log_dump.txt index_dump.txt Index and Log dump from the 0 segment of partition 0 of a compact topic. The index file is very small, and is quite obviously incorrect. {code} offset: -1733149320 position: 1307775 {code} Is the first incorrect entry. The corresponding entry in the Log dump shows an offset of {code}6856785272L{code}. This value overflows to the indexed offset. {code} scala> 6856785272L.toInt res1: Int = -1733149320 {code} > Negative offsets in Log Segment Index files due to Integer overflow when > compaction is enabled > ----------------------------------------------------------------------------------------------- > > Key: KAFKA-3323 > URL: https://issues.apache.org/jira/browse/KAFKA-3323 > Project: Kafka > Issue Type: Bug > Components: log > Affects Versions: 0.8.1.1 > Reporter: Michael Schiff > Assignee: Jay Kreps > Attachments: index_dump.txt, log_dump.txt > > > {code} > /** > * Append an entry for the given offset/location pair to the index. This > entry must have a larger offset than all subsequent entries. > */ > def append(offset: Long, position: Int) { > inLock(lock) { > require(!isFull, "Attempt to append to a full index (size = " + size + > ").") > if (size.get == 0 || offset > lastOffset) { > debug("Adding index entry %d => %d to %s.".format(offset, position, > file.getName)) > this.mmap.putInt((offset - baseOffset).toInt) > this.mmap.putInt(position) > this.size.incrementAndGet() > this.lastOffset = offset > require(entries * 8 == mmap.position, entries + " entries but file > position in index is " + mmap.position + ".") > } else { > throw new InvalidOffsetException("Attempt to append an offset (%d) to > position %d no larger than the last offset appended (%d) to %s." > .format(offset, entries, lastOffset, file.getAbsolutePath)) > } > } > } > {code} > OffsetIndex.append assumes that (offset - baseOffset) can be represented as > an integer without overflow. If the LogSegment is from a compacted topic, > this assumption may not be valid. The result is a quiet integer overflow, > which stores a negative value into the index. This breaks the binary search > used to lookup offset positions -> large intervals of offsets are skipped by > consumers who are bootstrapping themselves on the topic. > I believe that the issue is caused by the LogCleaner. Specifically, by the > groupings produced by > {code} > /** > * Group the segments in a log into groups totaling less than a given size. > the size is enforced separately for the log data and the index data. > * We collect a group of such segments together into a single > * destination segment. This prevents segment sizes from shrinking too much. > * > * @param segments The log segments to group > * @param maxSize the maximum size in bytes for the total of all log data > in a group > * @param maxIndexSize the maximum size in bytes for the total of all index > data in a group > * > * @return A list of grouped segments > */ > private[log] def groupSegmentsBySize(segments: Iterable[LogSegment], > maxSize: Int, maxIndexSize: Int): List[Seq[LogSegment]] > {code} > Since this method is only concerned with grouping by size, without taking > baseOffset and groupMaxOffset into account, it will produce groups that when > cleaned into a single segment, have offsets that overflow. This is more > likely for topics with low key cardinality, but high update volume, as you > could wind up with very few cleaned records, but with very high offsets. -- This message was sent by Atlassian JIRA (v6.3.4#6332)