[ 
https://issues.apache.org/jira/browse/KAFKA-3919?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15362865#comment-15362865
 ] 

Jun Rao commented on KAFKA-3919:
--------------------------------

[~BigAndy], thanks for the investigation and the additional information.

Let me first explain how log reconciliation normal works. Each broker maintains 
the last committed offset for each partition and stores that information in a 
checkpoint file replication-offset-checkpoint. A message is only considered 
committed if it's received by all in-sync replicas. The leader advances the 
last committed offset and propagates it to the followers. So, the follower's 
last committed offset is always <= the leader's. When a replica becomes a 
leader, it won't do any truncation to its log and will instead try to commit 
all messages in its local log. When a replica becomes a follower, it will first 
truncate its log to the last committed offset stored in its local checkpoint 
file and then start replicating from that offset. If unclean leader election is 
disabled, after truncation, the follower's last offset should always be <= the 
leader's last offset.

Another thing we do is that if a broker is shut down forcefully, on startup, we 
will do log recovery to remove any corrupted messages. In your case, it seems 
what happens is that when the new leader (2011) comes up, its log is actually 
corrupted and therefore it has to truncate some messages. This potentially will 
remove messages that have been previously committed. Then, when broker 2012 
comes up and becomes the follower. It's possible that after the follower 
truncates its log to its local last committed offset, that offset is actually 
larger than the last recovered offset in the leader. If no new messages have 
been appended to the new leader, the follower will realize that its offset is 
out of range and will truncate its log again to the leader's last offset. 
However, in this case, it seems some new messages have been published to the 
new leader and the follower's offset is actually in range. It's just that the 
follower's offset may now point to a completely new set of messages. In this 
case if the follower's offset points to the middle of a compressed message set, 
the follower will get the whole compressed message set and append it to its 
local log. Currently, in the follower, will only ensure that the last offset in 
a compressed message set be larger than the last offset in the log, but not the 
first offset. This seems to be the situation that you are in.

There is still one thing not very clear to me. When building indexes during log 
recovery, we actually only add index entries at the boundary of compressed 
message set. So as long as the last offset of each compressed set keeps 
increasing, we won't hit the InvalidOffsetException in the description. Could 
you check whether 1239742691 is the last offset of a compressed set and if so 
whether there is a case that the last offset of a compressed set is out of 
order in the log?

> Broker faills to start after ungraceful shutdown due to non-monotonically 
> incrementing offsets in logs
> ------------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-3919
>                 URL: https://issues.apache.org/jira/browse/KAFKA-3919
>             Project: Kafka
>          Issue Type: Bug
>          Components: core
>    Affects Versions: 0.9.0.1
>            Reporter: Andy Coates
>
> Hi All,
> I encountered an issue with Kafka following a power outage that saw a 
> proportion of our cluster disappear. When the power came back on several 
> brokers halted on start up with the error:
> {noformat}
>       Fatal error during KafkaServerStartable startup. Prepare to shutdown”
>       kafka.common.InvalidOffsetException: Attempt to append an offset 
> (1239742691) to position 35728 no larger than the last offset appended 
> (1239742822) to 
> /data3/kafka/mt_xp_its_music_main_itsevent-20/00000000001239444214.index.
>       at 
> kafka.log.OffsetIndex$$anonfun$append$1.apply$mcV$sp(OffsetIndex.scala:207)
>       at kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:197)
>       at kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:197)
>       at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262)
>       at kafka.log.OffsetIndex.append(OffsetIndex.scala:197)
>       at kafka.log.LogSegment.recover(LogSegment.scala:188)
>       at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:188)
>       at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:160)
>       at 
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
>       at 
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>       at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
>       at 
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
>       at kafka.log.Log.loadSegments(Log.scala:160)
>       at kafka.log.Log.<init>(Log.scala:90)
>       at 
> kafka.log.LogManager$$anonfun$loadLogs$2$$anonfun$3$$anonfun$apply$10$$anonfun$apply$1.apply$mcV$sp(LogManager.scala:150)
>       at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:60)
>       at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>       at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>       at java.lang.Thread.run(Thread.java:745)
> {noformat}
> The only way to recover the brokers was to delete the log files that 
> contained non monotonically incrementing offsets.
> We've spent some time digging through the logs and I feel I may have worked 
> out the sequence of events leading to this issue, (though this is based on 
> some assumptions I've made about the way Kafka is working, which may be 
> wrong).
> First off, we have unclean leadership elections disable. (We did later enable 
> them to help get around some other issues we were having, but this was 
> several hours after this issue manifested), and we're producing to the topic 
> with gzip compression and acks=1
> We looked through the data logs that were causing the brokers to not start. 
> What we found the initial part of the log has monotonically increasing 
> offset, where each compressed batch normally contained one or two records. 
> Then the is a batch that contains many records, whose first records have an 
> offset below the previous batch and whose last record has an offset above the 
> previous batch. Following on from this there continues a period of large 
> batches, with monotonically increasing offsets, and then the log returns to 
> batches with one or two records.
> Our working assumption here is that the period before the offset dip, with 
> the small batches, is pre-outage normal operation. The period of larger 
> batches is from just after the outage, where producers have a back log to 
> processes when the partition becomes available, and then things return to 
> normal batch sizes again once the back log clears.
> We did also look through the Kafka's application logs to try and piece 
> together the series of events leading up to this. Here’s what we know 
> happened, with regards to one partition that has issues, from the logs:
> Prior to outage:
> * Replicas for the partition are brokers 2011, 2012,  2024, with 2024 being 
> the preferred leader.
> * Producers using acks=1, compression=gzip
> * Brokers configured with unclean.elections=false, zk.session-timeout=36s
> Post outage:
> * 2011 comes up first, (also as the Controller), recovers unflushed log 
> segment 1239444214, completes load with offset 1239740602, and becomes leader 
> of the partition.
> * 2012 comes up next, recovers its log,  recovers unflushed log segment 
> 1239444214, truncates to offset 1239742830, (thats 2,228 records ahead of the 
> recovered offset of the current leader), and starts following.
> * 2024 comes up quickly after 2012.  recovers unflushed log segment 
> 1239444214, truncates to offset  1239742250, (thats 1,648 records ahead of 
> the recovered offset of the current leader), and starts following.
> * The Controller adds 2024 to the replica set just before 2024 halts due to 
> another partition having an offset greater than the leader.
> * The Controller adds 2012 to the replica set just before 2012 halts due to 
> another partition having an offset greater than the leader.
> * When 2012 is next restarted, it fails to fully start as its complaining of 
> invalid offsets in the log.
> You’ll notice that the offset the brokers truncate to are different for each 
> of the three brokers. 
> We're assuming that when the 2012 starts up and follows the leader it request 
> records from its truncated offsets, but that the logs have diverged on these 
> two brokers to the point that the requested offset corresponds within the 
> leader's log to the middle of a compressed record set, not at a record set 
> boundary.  The leader then returns the whole compressed set, which the 
> follower appends to its log - unknowingly introducing a dip in its otherwise 
> monotonically incrementing offsets.
> Several of our brokers were unlucky enough to have this dip at the 4K 
> boundary used by the offset indexer, causing a protracted outage.  We’ve 
> written a little utility that shows several more brokers have a dip outside 
> of the 4K boundary.
> There are some assumptions in there, which I’ve not got around to confirming 
> / denying. (A quick attempt to recreate this failed and I've not found the 
> time to invest more).
> Of course I'd really appreciate the community / experts stepping in and 
> commenting on whether our assumptions are right or wrong, or if there is 
> another explanation to the problem. 
> Obviously, the fact the broker got into this state and then won’t start is 
> obviously a bug, and one I’d like to fix.  A Kafka broker should not corrupt 
> its own log during normal operation to the point that it can’t restart! :D
> A secondary issue is if we think the divergent logs are acceptable? This may 
> be deemed acceptable given the producers have chosen availability over 
> consistency when they produced with acks = 1?  Though personally, the system 
> having diverging replicas of an immutable commit log just doesn't sit right.
> I see us having a few options here:
> * Have the replicas detect the divergence of their logs e.g. a follower 
> compares the checksum of its last record with the same offset on the leader. 
> The follower can then workout that its log has diverged from the leader.  At 
> which point it could either halt, stop replicating that partition or search 
> backwards to find the point of divergence, truncate and recover. (possibly 
> saving the truncated part somewhere). This would be a protocol change for 
> Kafka.  This solution trades availability, (you’ve got less ISRs during the 
> extended re-sync process), for consistency.
> * Leave the logs as they are and have the indexing of offsets in the log on 
> start up handle such a situation gracefully.  This leaves logs in a divergent 
> state between replicas, (meaning replays would yield different messages if 
> the leader was up to down), but gives better availability, (no time spent not 
> being an ISR while it repairs any divergence).
> * Support multiple options and allow it be tuned, ideally by topic.
> * Something else...
> I’m happy/keen to contribute here. But I’d like to first discuss which option 
> should be investigated.
> Andy



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to