[
https://issues.apache.org/jira/browse/KAFKA-3587?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15274454#comment-15274454
]
Guozhang Wang commented on KAFKA-3587:
--------------------------------------
Oops, accidentally deleted my previous comment, re-posting here: "there are
multiple proposals and PRs for this ticket, let's centralize our discussion on
this ticket first and then move ahead to work on the PR."
> LogCleaner fails due to incorrect offset map computation on a replica
> ---------------------------------------------------------------------
>
> Key: KAFKA-3587
> URL: https://issues.apache.org/jira/browse/KAFKA-3587
> Project: Kafka
> Issue Type: Bug
> Affects Versions: 0.9.0.1
> Environment: Linux
> Reporter: Kiran Pillarisetty
> Assignee: Edoardo Comar
> Attachments: 0001-POC-improving-deduping-segments.patch
>
>
> Log Cleaner fails to compact a segment even when the number of messages in it
> is less than the offset map.
> In version 0.9.0.1, (LogCleaner.scala -> buildOffsetMap()), LogCleaner
> computes segment size by subtracting segment's base offset from the latest
> offset ("segmentSize = segment.nextOffset() - segment.baseOffset"). This
> works fine until you create another replica. When you create a replica, it's
> segment could contain data which is already compacted on other brokers.
> Depending up on the type of data, offset difference could be too big, larger
> than the offset map (maxDesiredMapSize), and that causes LogCleaner to fail
> on that segment.
> Scenario:
> - Kafka 0.9.0.1
> - Cluster has two brokers.
> - Server.properties:
> log.cleaner.enable=true
> log.cleaner.dedupe.buffer.size=10485760 #10MB
> log.roll.ms=300000
> delete.topic.enable=true
> log.cleanup.policy=compact
> Steps to reproduce:
> 1. Create a topic with replication-factor of 1.
> ./kafka-topics.sh --zookeeper=localhost:2181 --create --topic
> test.log.compact.1M --partitions 1 --replication-factor 1 --config
> cleanup.policy=compact --config segment.ms=300000
> 2. Use kafka-console-producer.sh to produce a single message with the
> following key:
> LC1,{"test": "xyz"}
> 3. Use kafka-console-producer.sh to produce a large number of messages with
> the following key:
> LC2,{"test": "abc"}
> 4. Let log cleaner run. Make sure log is compacted. Verify with:
> ./kafka-run-class.sh kafka.tools.DumpLogSegments --files
> 00000000000000000000.log --print-data-log
> Dumping 00000000000000000000.log
> Starting offset: 0
> offset: 0 position: 0 isvalid: true payloadsize: 11 magic: 0 compresscodec:
> NoCompressionCodec crc: 3067045277 keysize: 11 key: LC1 payload: {"test":
> "xyz"}
> offset: 7869818 position: 48 isvalid: true payloadsize: 11 magic: 0
> compresscodec: NoCompressionCodec crc: 2668089711 keysize: 11 key: LC2
> payload: {"test": "abc"}
> 5. Increase Replication Factor to 2. Followed these steps:
> http://kafka.apache.org/documentation.html#basic_ops_increase_replication_factor
> 6. Notice that log cleaner fails to compact the newly created replica with
> the following error.
> [2016-04-18 14:49:45,599] ERROR [kafka-log-cleaner-thread-0], Error due to
> (kafka.log.LogCleaner)
> java.lang.IllegalArgumentException: requirement failed: 7206179 messages in
> segment test.log.compact.1M-0/00000000000000000000.log but offset map can fit
> only 393215. You can increase log.cleaner.dedupe.buffer.size or decrease
> log.cleaner.threads
> at scala.Predef$.require(Predef.scala:219)
> at
> kafka.log.Cleaner$$anonfun$buildOffsetMap$4.apply(LogCleaner.scala:584)
> at
> kafka.log.Cleaner$$anonfun$buildOffsetMap$4.apply(LogCleaner.scala:580)
> at
> scala.collection.immutable.Stream$StreamWithFilter.foreach(Stream.scala:570)
> at kafka.log.Cleaner.buildOffsetMap(LogCleaner.scala:580)
> at kafka.log.Cleaner.clean(LogCleaner.scala:322)
> at
> kafka.log.LogCleaner$CleanerThread.cleanOrSleep(LogCleaner.scala:230)
> at kafka.log.LogCleaner$CleanerThread.doWork(LogCleaner.scala:208)
> at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
> [2016-04-18 14:49:45,601] INFO [kafka-log-cleaner-thread-0], Stopped
> (kafka.log.LogCleaner)
> 7. Examine the entries in the replica segment:
> ./kafka-run-class.sh kafka.tools.DumpLogSegments --files
> 00000000000000000000.log --print-data-log
> There are only 218418 messages in that segment.
> However, Log Cleaner seems to think that there are 7206179 messages in that
> segment (as per the above error)
> Error stems from this line in LogCleaner.scala:
> """val segmentSize = segment.nextOffset() - segment.baseOffset"""
> In Replica's log segment file ( 00000000000000000000.log), ending offset is
> 7206178. Beginning offset is 0. That makes Log Cleaner think that there are
> 7206179 messages in that segment although there are only 218418 messages in
> it.
> IMO, to address this kind of scenario, LogCleaner.scala should check for the
> number of messages in the segment, instead of subtracting beginning offset
> from the ending offset.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)