[
https://issues.apache.org/jira/browse/KAFKA-5316?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16030029#comment-16030029
]
ASF GitHub Bot commented on KAFKA-5316:
---------------------------------------
Github user asfgit closed the pull request at:
https://github.com/apache/kafka/pull/3166
> Log cleaning can increase message size and cause cleaner to crash with buffer
> overflow
> --------------------------------------------------------------------------------------
>
> Key: KAFKA-5316
> URL: https://issues.apache.org/jira/browse/KAFKA-5316
> Project: Kafka
> Issue Type: Bug
> Reporter: Jason Gustafson
> Assignee: Jason Gustafson
> Fix For: 0.11.0.0
>
>
> We have observed in practice that it is possible for a compressed record set
> to grow after cleaning. Since the size of the cleaner's input and output
> buffers are identical, this can lead to overflow of the output buffer:
> {code}
> [2017-05-23 15:05:15,480] ERROR [kafka-log-cleaner-thread-0], Error due to
> (kafka.log.LogCleaner)
> java.nio.BufferOverflowException
> at java.nio.HeapByteBuffer.put(HeapByteBuffer.java:206)
> at org.apache.kafka.common.record.LogEntry.writeTo(LogEntry.java:104)
> at
> org.apache.kafka.common.record.MemoryRecords.filterTo(MemoryRecords.java:163)
> at
> org.apache.kafka.common.record.MemoryRecords.filterTo(MemoryRecords.java:114)
> at kafka.log.Cleaner.cleanInto(LogCleaner.scala:468)
> at
> kafka.log.Cleaner$$anonfun$cleanSegments$1.apply(LogCleaner.scala:405)
> at
> kafka.log.Cleaner$$anonfun$cleanSegments$1.apply(LogCleaner.scala:401)
> at scala.collection.immutable.List.foreach(List.scala:318)
> at kafka.log.Cleaner.cleanSegments(LogCleaner.scala:401)
> at kafka.log.Cleaner$$anonfun$clean$4.apply(LogCleaner.scala:363)
> at kafka.log.Cleaner$$anonfun$clean$4.apply(LogCleaner.scala:362)
> at scala.collection.immutable.List.foreach(List.scala:318)
> at kafka.log.Cleaner.clean(LogCleaner.scala:362)
> at
> kafka.log.LogCleaner$CleanerThread.cleanOrSleep(LogCleaner.scala:241)
> at kafka.log.LogCleaner$CleanerThread.doWork(LogCleaner.scala:220)
> at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
> [2017-05-23 15:05:15,481] INFO [kafka-log-cleaner-thread-0], Stopped
> (kafka.log.LogCleaner)
> {code}
> It is also then possible for a compressed message set to grow beyond the max
> message size. Due to the changes in KIP-74 to alter fetch semantics, the
> suggestion for this case is to allow the recompressed message set to exceed
> the max message size. This should be rare in practice and won't prevent
> consumers from making progress.
> To handle the overflow issue, one option is to allocate a temporary buffer
> when filtering in {{MemoryRecords.filterTo}} and return it in the result. As
> an optimization, we can resort to this only when there is a single
> recompressed message set which is larger than the entire write buffer.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)