[ 
https://issues.apache.org/jira/browse/KAFKA-3955?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16285148#comment-16285148
 ] 

James Cheng commented on KAFKA-3955:
------------------------------------

log.message.format.version = 0.10.0.1

> Kafka log recovery doesn't truncate logs on non-monotonic offsets, leading to 
> failed broker boot
> ------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-3955
>                 URL: https://issues.apache.org/jira/browse/KAFKA-3955
>             Project: Kafka
>          Issue Type: Bug
>    Affects Versions: 0.10.0.0, 0.10.1.1, 0.11.0.0, 1.0.0
>            Reporter: Tom Crayford
>            Priority: Critical
>              Labels: reliability
>             Fix For: 1.0.1
>
>
> Hi,
> I've found a bug impacting kafka brokers on startup after an unclean 
> shutdown. If a log segment is corrupt and has non-monotonic offsets (see the 
> appendix of this bug for a sample output from {{DumpLogSegments}}), then 
> {{LogSegment.recover}} throws an {{InvalidOffsetException}} error: 
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/OffsetIndex.scala#L218
> That code is called by {{LogSegment.recover}}: 
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LogSegment.scala#L191
> Which is called in several places in {{Log.scala}}. Notably it's called four 
> times during recovery:
> Thrice in Log.loadSegments
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/Log.scala#L199
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/Log.scala#L204
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/Log.scala#L226
> and once in Log.recoverLog
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/Log.scala#L268
> Of these, only the very last one has a {{catch}} for 
> {{InvalidOffsetException}}. When that catches the issue, it truncates the 
> whole log (not just this segment): 
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/Log.scala#L274
>  to the start segment of the bad log segment.
> However, this code can't be hit on recovery, because of the code paths in 
> {{loadSegments}} - they mean we'll never hit truncation here, as we always 
> throw this exception and that goes all the way to the toplevel exception 
> handler and crashes the JVM.
> As {{Log.recoverLog}} is always called during recovery, I *think* a fix for 
> this is to move this crash recovery/truncate code inside a new method in 
> {{Log.scala}}, and call that instead of {{LogSegment.recover}} in each place. 
> That code should return the number of {{truncatedBytes}} like we do in 
> {{Log.recoverLog}} and then truncate the log. The callers will have to be 
> notified "stop iterating over files in the directory", likely via a return 
> value of {{truncatedBytes}} like {{Log.recoverLog` does right now.
> I'm happy working on a patch for this. I'm aware this recovery code is tricky 
> and important to get right.
> I'm also curious (and currently don't have good theories as of yet) as to how 
> this log segment got into this state with non-monotonic offsets. This segment 
> is using gzip compression, and is under 0.9.0.1. The same bug with respect to 
> recovery exists in trunk, but I'm unsure if the new handling around 
> compressed messages (KIP-31) means the bug where non-monotonic offsets get 
> appended is still present in trunk.
> As a production workaround, one can manually truncate that log folder 
> yourself (delete all .index/.log files including and after the one with the 
> bad offset). However, kafka should (and can) handle this case well - with 
> replication we can truncate in broker startup.
> stacktrace and error message:
> {code}
> pri=WARN  t=pool-3-thread-4 at=Log Found a corrupted index file, 
> /$DIRECTORY/$TOPIC-22/00000000000014306536.index, deleting and rebuilding 
> index...
> pri=ERROR t=main at=LogManager There was an error in one of the threads 
> during logs loading: kafka.common.InvalidOffsetException: Attempt to append 
> an offset (15000337) to position 111719 no larger than the last offset 
> appended (15000337) to /$DIRECTORY/$TOPIC-8/00000000000014008931.index.
> pri=FATAL t=main at=KafkaServer Fatal error during KafkaServer startup. 
> Prepare to shutdown kafka.common.InvalidOffsetException: Attempt to append an 
> offset (15000337) to position 111719 no larger than the last offset appended 
> (15000337) to /$DIRECTORY/$TOPIC-8/00000000000014008931.index.
>         at 
> kafka.log.OffsetIndex$$anonfun$append$1.apply$mcV$sp(OffsetIndex.scala:207)
>         at 
> kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:197)
>         at 
> kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:197)
>         at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262)
>         at kafka.log.OffsetIndex.append(OffsetIndex.scala:197)
>         at kafka.log.LogSegment.recover(LogSegment.scala:188)
>         at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:188)
>         at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:160)
>         at scala.collection.TraversableLike$With...
>         ...Filter$$anonfun$foreach$1.apply(TraversableLike.scala:778)
>         at 
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>         at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
>         at 
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:777)
>         at kafka.log.Log.loadSegments(Log.scala:160)
>         at kafka.log.Log.<init>(Log.scala:90)
>         at 
> kafka.log.LogManager$$anonfun$loadLogs$2$$anonfun$3$$anonfun$apply$10$$anonfun$apply$1.apply$mcV$sp(LogManager.scala:150)
>         at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:60)
>         at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>         at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>         at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>         at java.lang.Thread.run(Thread.java:745)
> {code}
> non-monotonic offsets from DumpLogSegments tool (with --deep-iteration) (they 
> go from 15000362 to 15000337):
> {code}
> offset:  15000361  position:  485145166  isvalid:  true  payloadsize:  1474  
> magic:  0  compresscodec:  NoCompressionCodec  crc:  3958745838  keysize:  36
> offset:  15000362  position:  485145166  isvalid:  true  payloadsize:  1474  
> magic:  0  compresscodec:  NoCompressionCodec  crc:  374463118   keysize:  36
> offset:  15000337  position:  485149591  isvalid:  true  payloadsize:  1474  
> magic:  0  compresscodec:  NoCompressionCodec  crc:  3955938191  keysize:  36
> offset:  15000338  position:  485149591  isvalid:  true  payloadsize:  1474  
> magic:  0  compresscodec:  NoCompressionCodec  crc:  4121377803  keysize:  36
> offset:  15000339  position:  485149591  isvalid:  true  payloadsize:  1474  
> magic:  0  compresscodec:  NoCompressionCodec  crc:  372883992   keysize:  36
> offset:  15000340  position:  485149591  isvalid:  true  payloadsize:  1474  
> magic:  0  compresscodec:  NoCompressionCodec  crc:  1294476491  keysize:  36
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to