[ https://issues.apache.org/jira/browse/KAFKA-3955?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16285148#comment-16285148 ]
James Cheng commented on KAFKA-3955: ------------------------------------ log.message.format.version = 0.10.0.1 > Kafka log recovery doesn't truncate logs on non-monotonic offsets, leading to > failed broker boot > ------------------------------------------------------------------------------------------------ > > Key: KAFKA-3955 > URL: https://issues.apache.org/jira/browse/KAFKA-3955 > Project: Kafka > Issue Type: Bug > Affects Versions: 0.10.0.0, 0.10.1.1, 0.11.0.0, 1.0.0 > Reporter: Tom Crayford > Priority: Critical > Labels: reliability > Fix For: 1.0.1 > > > Hi, > I've found a bug impacting kafka brokers on startup after an unclean > shutdown. If a log segment is corrupt and has non-monotonic offsets (see the > appendix of this bug for a sample output from {{DumpLogSegments}}), then > {{LogSegment.recover}} throws an {{InvalidOffsetException}} error: > https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/OffsetIndex.scala#L218 > That code is called by {{LogSegment.recover}}: > https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LogSegment.scala#L191 > Which is called in several places in {{Log.scala}}. Notably it's called four > times during recovery: > Thrice in Log.loadSegments > https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/Log.scala#L199 > https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/Log.scala#L204 > https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/Log.scala#L226 > and once in Log.recoverLog > https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/Log.scala#L268 > Of these, only the very last one has a {{catch}} for > {{InvalidOffsetException}}. When that catches the issue, it truncates the > whole log (not just this segment): > https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/Log.scala#L274 > to the start segment of the bad log segment. > However, this code can't be hit on recovery, because of the code paths in > {{loadSegments}} - they mean we'll never hit truncation here, as we always > throw this exception and that goes all the way to the toplevel exception > handler and crashes the JVM. > As {{Log.recoverLog}} is always called during recovery, I *think* a fix for > this is to move this crash recovery/truncate code inside a new method in > {{Log.scala}}, and call that instead of {{LogSegment.recover}} in each place. > That code should return the number of {{truncatedBytes}} like we do in > {{Log.recoverLog}} and then truncate the log. The callers will have to be > notified "stop iterating over files in the directory", likely via a return > value of {{truncatedBytes}} like {{Log.recoverLog` does right now. > I'm happy working on a patch for this. I'm aware this recovery code is tricky > and important to get right. > I'm also curious (and currently don't have good theories as of yet) as to how > this log segment got into this state with non-monotonic offsets. This segment > is using gzip compression, and is under 0.9.0.1. The same bug with respect to > recovery exists in trunk, but I'm unsure if the new handling around > compressed messages (KIP-31) means the bug where non-monotonic offsets get > appended is still present in trunk. > As a production workaround, one can manually truncate that log folder > yourself (delete all .index/.log files including and after the one with the > bad offset). However, kafka should (and can) handle this case well - with > replication we can truncate in broker startup. > stacktrace and error message: > {code} > pri=WARN t=pool-3-thread-4 at=Log Found a corrupted index file, > /$DIRECTORY/$TOPIC-22/00000000000014306536.index, deleting and rebuilding > index... > pri=ERROR t=main at=LogManager There was an error in one of the threads > during logs loading: kafka.common.InvalidOffsetException: Attempt to append > an offset (15000337) to position 111719 no larger than the last offset > appended (15000337) to /$DIRECTORY/$TOPIC-8/00000000000014008931.index. > pri=FATAL t=main at=KafkaServer Fatal error during KafkaServer startup. > Prepare to shutdown kafka.common.InvalidOffsetException: Attempt to append an > offset (15000337) to position 111719 no larger than the last offset appended > (15000337) to /$DIRECTORY/$TOPIC-8/00000000000014008931.index. > at > kafka.log.OffsetIndex$$anonfun$append$1.apply$mcV$sp(OffsetIndex.scala:207) > at > kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:197) > at > kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:197) > at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262) > at kafka.log.OffsetIndex.append(OffsetIndex.scala:197) > at kafka.log.LogSegment.recover(LogSegment.scala:188) > at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:188) > at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:160) > at scala.collection.TraversableLike$With... > ...Filter$$anonfun$foreach$1.apply(TraversableLike.scala:778) > at > scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) > at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186) > at > scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:777) > at kafka.log.Log.loadSegments(Log.scala:160) > at kafka.log.Log.<init>(Log.scala:90) > at > kafka.log.LogManager$$anonfun$loadLogs$2$$anonfun$3$$anonfun$apply$10$$anonfun$apply$1.apply$mcV$sp(LogManager.scala:150) > at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:60) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > {code} > non-monotonic offsets from DumpLogSegments tool (with --deep-iteration) (they > go from 15000362 to 15000337): > {code} > offset: 15000361 position: 485145166 isvalid: true payloadsize: 1474 > magic: 0 compresscodec: NoCompressionCodec crc: 3958745838 keysize: 36 > offset: 15000362 position: 485145166 isvalid: true payloadsize: 1474 > magic: 0 compresscodec: NoCompressionCodec crc: 374463118 keysize: 36 > offset: 15000337 position: 485149591 isvalid: true payloadsize: 1474 > magic: 0 compresscodec: NoCompressionCodec crc: 3955938191 keysize: 36 > offset: 15000338 position: 485149591 isvalid: true payloadsize: 1474 > magic: 0 compresscodec: NoCompressionCodec crc: 4121377803 keysize: 36 > offset: 15000339 position: 485149591 isvalid: true payloadsize: 1474 > magic: 0 compresscodec: NoCompressionCodec crc: 372883992 keysize: 36 > offset: 15000340 position: 485149591 isvalid: true payloadsize: 1474 > magic: 0 compresscodec: NoCompressionCodec crc: 1294476491 keysize: 36 > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)