Michael Schiff created KAFKA-3323:
-------------------------------------

             Summary: Negative offsets in Log Segment Index files due to 
Integer overflow when compaction is enabled 
                 Key: KAFKA-3323
                 URL: https://issues.apache.org/jira/browse/KAFKA-3323
             Project: Kafka
          Issue Type: Bug
          Components: log
    Affects Versions: 0.8.1.1
            Reporter: Michael Schiff
            Assignee: Jay Kreps


{code}
 /**
   * Append an entry for the given offset/location pair to the index. This 
entry must have a larger offset than all subsequent entries.
   */
  def append(offset: Long, position: Int) {
    inLock(lock) {
      require(!isFull, "Attempt to append to a full index (size = " + size + 
").")
      if (size.get == 0 || offset > lastOffset) {
        debug("Adding index entry %d => %d to %s.".format(offset, position, 
file.getName))
        this.mmap.putInt((offset - baseOffset).toInt)
        this.mmap.putInt(position)
        this.size.incrementAndGet()
        this.lastOffset = offset
        require(entries * 8 == mmap.position, entries + " entries but file 
position in index is " + mmap.position + ".")
      } else {
        throw new InvalidOffsetException("Attempt to append an offset (%d) to 
position %d no larger than the last offset appended (%d) to %s."
          .format(offset, entries, lastOffset, file.getAbsolutePath))
      }
    }
  }
{code}

OffsetIndex.append assumes that (offset - baseOffset) can be represented as an 
integer without overflow. If the LogSegment is from a compacted topic, this 
assumption may not be valid. The result is a quiet integer overflow, which 
stores a negative value into the index. This breaks the binary search used to 
lookup offset positions -> large intervals of offsets are skipped by consumers 
who are bootstrapping themselves on the topic.

I believe that the issue is caused by the LogCleaner. Specifically, by the 
groupings produced by 
{code}
/**
   * Group the segments in a log into groups totaling less than a given size. 
the size is enforced separately for the log data and the index data.
   * We collect a group of such segments together into a single
   * destination segment. This prevents segment sizes from shrinking too much.
   *
   * @param segments The log segments to group
   * @param maxSize the maximum size in bytes for the total of all log data in 
a group
   * @param maxIndexSize the maximum size in bytes for the total of all index 
data in a group
   *
   * @return A list of grouped segments
   */
  private[log] def groupSegmentsBySize(segments: Iterable[LogSegment], maxSize: 
Int, maxIndexSize: Int): List[Seq[LogSegment]]
{code}

Since this method is only concerned with grouping by size, without taking 
baseOffset and groupMaxOffset into account, it will produce groups that when 
cleaned into a single segment, have offsets that overflow. This is more likely 
for topics with low key cardinality, but high update volume, as you could wind 
up with very few cleaned records, but with very high offsets.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to