[ 
https://issues.apache.org/jira/browse/KAFKA-3323?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michael Schiff updated KAFKA-3323:
----------------------------------
    Comment: was deleted

(was: Re-opening so that I can attach the log-splitter patch)

> Negative offsets in Log Segment Index files due to Integer overflow when 
> compaction is enabled 
> -----------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-3323
>                 URL: https://issues.apache.org/jira/browse/KAFKA-3323
>             Project: Kafka
>          Issue Type: Bug
>          Components: log
>    Affects Versions: 0.8.1.1, 0.8.2.1
>            Reporter: Michael Schiff
>            Assignee: Jay Kreps
>         Attachments: index_dump.txt, log_dump.txt
>
>
> Once the Offset Index has negative offset values, the binary search for 
> position lookup is broken. This causes consumers of compact topics to skip 
> large offset intervals when bootstrapping.  This has serious implications for 
> consumers of compact topics.
> {code}
>  /**
>    * Append an entry for the given offset/location pair to the index. This 
> entry must have a larger offset than all subsequent entries.
>    */
>   def append(offset: Long, position: Int) {
>     inLock(lock) {
>       require(!isFull, "Attempt to append to a full index (size = " + size + 
> ").")
>       if (size.get == 0 || offset > lastOffset) {
>         debug("Adding index entry %d => %d to %s.".format(offset, position, 
> file.getName))
>         this.mmap.putInt((offset - baseOffset).toInt)
>         this.mmap.putInt(position)
>         this.size.incrementAndGet()
>         this.lastOffset = offset
>         require(entries * 8 == mmap.position, entries + " entries but file 
> position in index is " + mmap.position + ".")
>       } else {
>         throw new InvalidOffsetException("Attempt to append an offset (%d) to 
> position %d no larger than the last offset appended (%d) to %s."
>           .format(offset, entries, lastOffset, file.getAbsolutePath))
>       }
>     }
>   }
> {code}
> OffsetIndex.append assumes that (offset - baseOffset) can be represented as 
> an integer without overflow. If the LogSegment is from a compacted topic, 
> this assumption may not be valid. The result is a quiet integer overflow, 
> which stores a negative value into the index.
> I believe that the issue is caused by the LogCleaner. Specifically, by the 
> groupings produced by 
> {code}
> /**
>    * Group the segments in a log into groups totaling less than a given size. 
> the size is enforced separately for the log data and the index data.
>    * We collect a group of such segments together into a single
>    * destination segment. This prevents segment sizes from shrinking too much.
>    *
>    * @param segments The log segments to group
>    * @param maxSize the maximum size in bytes for the total of all log data 
> in a group
>    * @param maxIndexSize the maximum size in bytes for the total of all index 
> data in a group
>    *
>    * @return A list of grouped segments
>    */
>   private[log] def groupSegmentsBySize(segments: Iterable[LogSegment], 
> maxSize: Int, maxIndexSize: Int): List[Seq[LogSegment]]
> {code}
> Since this method is only concerned with grouping by size, without taking 
> baseOffset and groupMaxOffset into account, it will produce groups that when 
> cleaned into a single segment, have offsets that overflow. This is more 
> likely for topics with low key cardinality, but high update volume, as you 
> could wind up with very few cleaned records, but with very high offsets.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to