[ 
https://issues.apache.org/jira/browse/KAFKA-3587?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15273455#comment-15273455
 ] 

Edoardo Comar commented on KAFKA-3587:
--------------------------------------

[~alekar] I had a look at your patch. The approach you took is that of using 
one temp map per segment and one overall map, which I had considered too.
Do you think that allocating to the overall map just 1/2 the memory (w.r.t. the 
current approach ) is a good strategy?

The approach I and [~mimaison] are working on is to try and compact until the 
(only-one-per-thread) map is full, which is maximising the use of the memory 
allocated to the map.


> LogCleaner fails due to incorrect offset map computation on a replica
> ---------------------------------------------------------------------
>
>                 Key: KAFKA-3587
>                 URL: https://issues.apache.org/jira/browse/KAFKA-3587
>             Project: Kafka
>          Issue Type: Bug
>    Affects Versions: 0.9.0.1
>         Environment: Linux
>            Reporter: Kiran Pillarisetty
>            Assignee: Edoardo Comar
>         Attachments: 0001-POC-improving-deduping-segments.patch
>
>
> Log Cleaner fails to compact a segment even when the number of messages in it 
> is less than the offset map.
> In version 0.9.0.1, (LogCleaner.scala -> buildOffsetMap()), LogCleaner 
> computes segment size by subtracting segment's base offset from the latest 
> offset ("segmentSize = segment.nextOffset() - segment.baseOffset").  This 
> works fine until you create another replica. When you create a replica, it's 
> segment could contain data which is already compacted on other brokers. 
> Depending up on the type of data, offset difference could be too big, larger 
> than the offset map (maxDesiredMapSize), and that causes LogCleaner to fail 
> on that segment.
> Scenario:
> - Kafka 0.9.0.1
> - Cluster has two brokers.
> - Server.properties:
> log.cleaner.enable=true
> log.cleaner.dedupe.buffer.size=10485760 #10MB
> log.roll.ms=300000
> delete.topic.enable=true
> log.cleanup.policy=compact
> Steps to reproduce:
> 1. Create a topic with replication-factor of 1.
> ./kafka-topics.sh --zookeeper=localhost:2181 --create --topic 
> test.log.compact.1M --partitions 1 --replication-factor 1 --config 
> cleanup.policy=compact --config segment.ms=300000
> 2. Use kafka-console-producer.sh to produce a single message with the 
> following key:
> LC1,{"test": "xyz"}
> 3. Use  kafka-console-producer.sh to produce a large number of messages with 
> the following key:
> LC2,{"test": "abc"}
> 4. Let log cleaner run. Make sure log is compacted.  Verify with:
>  ./kafka-run-class.sh kafka.tools.DumpLogSegments  --files 
> 00000000000000000000.log  --print-data-log
> Dumping 00000000000000000000.log
> Starting offset: 0
> offset: 0 position: 0 isvalid: true payloadsize: 11 magic: 0 compresscodec: 
> NoCompressionCodec crc: 3067045277 keysize: 11 key: LC1 payload: {"test": 
> "xyz"}
> offset: 7869818 position: 48 isvalid: true payloadsize: 11 magic: 0 
> compresscodec: NoCompressionCodec crc: 2668089711 keysize: 11 key: LC2 
> payload: {"test": "abc"}
> 5.  Increase Replication Factor to 2.  Followed these steps: 
> http://kafka.apache.org/documentation.html#basic_ops_increase_replication_factor
> 6. Notice that log cleaner fails to compact the newly created replica with 
> the following error.
> [2016-04-18 14:49:45,599] ERROR [kafka-log-cleaner-thread-0], Error due to  
> (kafka.log.LogCleaner)
> java.lang.IllegalArgumentException: requirement failed: 7206179 messages in 
> segment test.log.compact.1M-0/00000000000000000000.log but offset map can fit 
> only 393215. You can increase log.cleaner.dedupe.buffer.size or decrease 
> log.cleaner.threads
>         at scala.Predef$.require(Predef.scala:219)
>         at 
> kafka.log.Cleaner$$anonfun$buildOffsetMap$4.apply(LogCleaner.scala:584)
>         at 
> kafka.log.Cleaner$$anonfun$buildOffsetMap$4.apply(LogCleaner.scala:580)
>         at 
> scala.collection.immutable.Stream$StreamWithFilter.foreach(Stream.scala:570)
>         at kafka.log.Cleaner.buildOffsetMap(LogCleaner.scala:580)
>         at kafka.log.Cleaner.clean(LogCleaner.scala:322)
>         at 
> kafka.log.LogCleaner$CleanerThread.cleanOrSleep(LogCleaner.scala:230)
>         at kafka.log.LogCleaner$CleanerThread.doWork(LogCleaner.scala:208)
>         at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
> [2016-04-18 14:49:45,601] INFO [kafka-log-cleaner-thread-0], Stopped  
> (kafka.log.LogCleaner)
> 7. Examine the entries in the replica segment:
> ./kafka-run-class.sh kafka.tools.DumpLogSegments --files 
> 00000000000000000000.log  --print-data-log
> There are only 218418 messages in that segment.
> However, Log Cleaner seems to think that there are 7206179 messages in that 
> segment (as per the above error)
> Error stems from this line in LogCleaner.scala:
> """val segmentSize = segment.nextOffset() - segment.baseOffset"""
> In Replica's log segment file ( 00000000000000000000.log), ending offset is 
> 7206178. Beginning offset is 0.  That makes Log Cleaner think that there are 
> 7206179 messages in that segment although there are only 218418 messages in 
> it.
> IMO,  to address this kind of scenario, LogCleaner.scala should check for the 
> number of messages in the segment, instead of subtracting beginning offset 
> from the ending offset.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to