[ 
https://issues.apache.org/jira/browse/KAFKA-2118?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14494214#comment-14494214
 ] 

Rajini Sivaram commented on KAFKA-2118:
---------------------------------------

Looking at the code, the sequence for replacing segments is:
# Cleaner cleans a sequence of segments into a new segment with suffix .cleaned
# .cleaned is renamed to .swap. 
# Old segments are renamed, adding .deleted suffix
# Async deletion of .deleted files is scheduled
# .swap suffix is removed from the swap file, resulting in the swap file 
replacing the segments

The different scenarios for recovery are:
* If broker crashes before 2), .cleaned files are deleted on startup, aborting 
the clean operation. No issues here.
* There is an issue if the broker crashes between 2) and 3). If the .swap file 
was created, but the old segment files were not yet renamed to .deleted, the 
current recovery process replaces the segment file with the swap file, but does 
not delete the old segment files.
* If broker crashes after 3), recovery works fine on startup because old 
segment files no longer exist and any .deleted files found on startup are 
deleted, completing 4). If .swap was not yet renamed, it is renamed on startup, 
completing 5). There are no issues here. As far as I can tell, graceful 
shutdown is similar to this case and should work fine since the log files being 
deleted would have been renamed to .deleted.

To fix the second scenario above, the recovery operation should delete any 
remaining log files corresponding to the .swap file before the .swap file 
replaces the actual log file.

The attached patch fixes the recovery process by invoking the same 
replaceSegments() method during recovery, leaving all the logic for crash-safe 
swapping in one place.


> Cleaner cannot clean after shutdown during replaceSegments
> ----------------------------------------------------------
>
>                 Key: KAFKA-2118
>                 URL: https://issues.apache.org/jira/browse/KAFKA-2118
>             Project: Kafka
>          Issue Type: Bug
>    Affects Versions: 0.8.2.0
>            Reporter: Gian Merlino
>            Assignee: Rajini Sivaram
>         Attachments: KAFKA-2118.patch
>
>
> If a broker shuts down after the cleaner calls replaceSegments with more than 
> one segment, the partition can be left in an uncleanable state. We saw this 
> on a few brokers after doing a rolling update. The sequence of things we saw 
> is:
> 1) Cleaner cleaned segments with base offsets 0, 1094621529, and 1094831997 
> into a new segment 0.
> 2) Cleaner logged "Swapping in cleaned segment 0 for segment(s) 
> 0,1094621529,1094831997 in log xxx-15." and called replaceSegments.
> 3) 0.cleaned was renamed to 0.swap.
> 4) Broker shut down before deleting segments 1094621529 and 1094831997.
> 5) Broker started up and logged "Found log file 
> /mnt/persistent/kafka-logs/xxx-15/00000000000000000000.log.swap from 
> interrupted swap operation, repairing."
> 6) Cleaner thread died with the exception 
> "kafka.common.InvalidOffsetException: Attempt to append an offset 
> (1094911424) to position 1003 no larger than the last offset appended 
> (1095045873) to 
> /mnt/persistent/kafka-logs/xxx-15/00000000000000000000.index.cleaned."
> I think what's happening in #6 is that when the broker started back up and 
> repaired the log, segment 0 ended up with a bunch of messages that were also 
> in segment 1094621529 and 1094831997 (because the new segment 0 was created 
> from cleaning all 3). But segments 1094621529 and 1094831997 were still on 
> disk, so offsets on disk were no longer monotonically increasing, violating 
> the assumption of OffsetIndex. We ended up fixing this by deleting segments 
> 1094621529 and 1094831997 manually, and then removing the line for this 
> partition from the cleaner-offset-checkpoint file (otherwise it would 
> reference the non-existent segment 1094621529).
> This can happen even on a clean shutdown (the async deletes in 
> replaceSegments might not happen).
> Cleaner logs post-startup:
> 2015-04-12 15:07:56,533 INFO [kafka-log-cleaner-thread-0] 
> kafka.log.LogCleaner - Cleaner 0: Beginning cleaning of log xxx-15.
> 2015-04-12 15:07:56,533 INFO [kafka-log-cleaner-thread-0] 
> kafka.log.LogCleaner - Cleaner 0: Building offset map for xxx-15...
> 2015-04-12 15:07:56,595 INFO [kafka-log-cleaner-thread-0] 
> kafka.log.LogCleaner - Cleaner 0: Building offset map for log xxx-15 for 6 
> segments in offset range [1094621529, 1095924157).
> 2015-04-12 15:08:01,443 INFO [kafka-log-cleaner-thread-0] 
> kafka.log.LogCleaner - Cleaner 0: Offset map for log xxx-15 complete.
> 2015-04-12 15:08:01,443 INFO [kafka-log-cleaner-thread-0] 
> kafka.log.LogCleaner - Cleaner 0: Cleaning log xxx-15 (discarding tombstones 
> prior to Sun Apr 12 14:05:37 UTC 2015)...
> 2015-04-12 15:08:01,443 INFO [kafka-log-cleaner-thread-0] 
> kafka.log.LogCleaner - Cleaner 0: Cleaning segment 0 in log xxx-15 (last 
> modified Sun Apr 12 14:05:38 UTC 2015) into 0, retaining deletes.
> 2015-04-12 15:08:04,283 INFO [kafka-log-cleaner-thread-0] 
> kafka.log.LogCleaner - Cleaner 0: Cleaning segment 1094621529 in log xxx-15 
> (last modified Sun Apr 12 13:49:27 UTC 2015) into 0, discarding deletes.
> 2015-04-12 15:08:05,079 INFO [kafka-log-cleaner-thread-0] 
> kafka.log.LogCleaner - Cleaner 0: Cleaning segment 1094831997 in log xxx-15 
> (last modified Sun Apr 12 14:04:28 UTC 2015) into 0, discarding deletes.
> 2015-04-12 15:08:05,157 ERROR [kafka-log-cleaner-thread-0] 
> kafka.log.LogCleaner - [kafka-log-cleaner-thread-0], Error due to
> kafka.common.InvalidOffsetException: Attempt to append an offset (1094911424) 
> to position 1003 no larger than the last offset appended (1095045873) to 
> /mnt/persistent/kafka-logs/xxx-15/00000000000000000000.index.
> cleaned.
> at kafka.log.OffsetIndex$$anonfun$append$1.apply$mcV$sp(OffsetIndex.scala:207)
> at kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:197)
> at kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:197)
> at kafka.utils.Utils$.inLock(Utils.scala:535)
> at kafka.log.OffsetIndex.append(OffsetIndex.scala:197)
> at kafka.log.LogSegment.append(LogSegment.scala:81)
> at kafka.log.Cleaner.cleanInto(LogCleaner.scala:427)
> at kafka.log.Cleaner$$anonfun$cleanSegments$1.apply(LogCleaner.scala:358)
> at kafka.log.Cleaner$$anonfun$cleanSegments$1.apply(LogCleaner.scala:354)
> at scala.collection.immutable.List.foreach(List.scala:318)
> at kafka.log.Cleaner.cleanSegments(LogCleaner.scala:354)
> at kafka.log.Cleaner$$anonfun$clean$4.apply(LogCleaner.scala:321)
> at kafka.log.Cleaner$$anonfun$clean$4.apply(LogCleaner.scala:320)
> at scala.collection.immutable.List.foreach(List.scala:318)
> at kafka.log.Cleaner.clean(LogCleaner.scala:320)
> at kafka.log.LogCleaner$CleanerThread.cleanOrSleep(LogCleaner.scala:221)
> at kafka.log.LogCleaner$CleanerThread.doWork(LogCleaner.scala:199)
> at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
> 2015-04-12 15:08:05,157 INFO [kafka-log-cleaner-thread-0] 
> kafka.log.LogCleaner - [kafka-log-cleaner-thread-0], Stopped



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to