[ 
https://issues.apache.org/jira/browse/KAFKA-3428?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-3428:
-------------------------------
    Fix Version/s:     (was: 0.10.0.1)
                   0.10.1.0

> Remove metadata sync bottleneck from mirrormaker's producer
> -----------------------------------------------------------
>
>                 Key: KAFKA-3428
>                 URL: https://issues.apache.org/jira/browse/KAFKA-3428
>             Project: Kafka
>          Issue Type: Improvement
>    Affects Versions: 0.9.0.1
>            Reporter: Maysam Yabandeh
>             Fix For: 0.10.1.0
>
>
> Due to sync on the single producer, MM in a setup with 32 consumer threads 
> could not send more than 
> 358k msg/sec hence not being able to saturate the NIC. Profiling showed the 
> producer.send takes 0.080 ms in average, which explains the bottleneck of 
> 358k msg/sec. The following explains the bottleneck in producer.send and 
> suggests how to improve it.
> Current impl of MM relies on a single reducer. For EACH message, the 
> producer.send() calls waitOnMetadata which runs the following synchronized 
> method
> {code}
>         // add topic to metadata topic list if it is not there already.
>         if (!this.metadata.containsTopic(topic))
>             this.metadata.add(topic);
> {code}
> Although the code is mostly noop, since containsTopic is synchronized it 
> becomes the bottleneck in MM.
> Profiling highlights this bottleneck:
> {code}
> 100.0% - 65,539 ms kafka.tools.MirrorMaker$MirrorMakerThread.run
>   18.9% - 12,403 ms org.apache.kafka.clients.producer.KafkaProducer.send
>   13.8% - 9,056 ms 
> org.apache.kafka.clients.producer.KafkaProducer.waitOnMetadata
>   12.1% - 7,933 ms org.apache.kafka.clients.Metadata.containsTopic
>   1.7% - 1,088 ms org.apache.kafka.clients.Metadata.fetch
>   2.6% - 1,729 ms org.apache.kafka.clients.Metadata.fetch
>   2.2% - 1,442 ms 
> org.apache.kafka.clients.producer.internals.RecordAccumulator.append
> {code}
> After replacing this bottleneck with a kind of noop, another run of the 
> profiler shows that fetch is the next bottleneck:
> {code}
> org.xerial.snappy.SnappyNative.arrayCopy       132 s (54 %)   n/a     n/a
>       java.lang.Thread.run     50,776 ms (21 %)       n/a     n/a
>       org.apache.kafka.clients.Metadata.fetch  20,881 ms (8 %)        n/a     
> n/a
>   6.8% - 16,546 ms 
> org.apache.kafka.clients.producer.KafkaProducer.waitOnMetadata
>   6.8% - 16,546 ms org.apache.kafka.clients.producer.KafkaProducer.send
>   6.8% - 16,546 ms kafka.tools.MirrorMaker$MirrorMakerProducer.send
> {code}
> however the fetch method does not need to be syncronized
> {code}
>     public synchronized Cluster fetch() {
>         return this.cluster;
>     }
> {code}
> removing sync from the fetch method shows that bottleneck is disappeared:
> {code}
> org.xerial.snappy.SnappyNative.arrayCopy       249 s (78 %)   n/a     n/a
>       org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel  
>  24,489 ms (7 %)        n/a     n/a
>       org.xerial.snappy.SnappyNative.rawUncompress     17,024 ms (5 %)        
> n/a     n/a
>       org.apache.kafka.clients.producer.internals.RecordAccumulator.append    
>  13,817 ms (4 %)        n/a     n/a
>   4.3% - 13,817 ms org.apache.kafka.clients.producer.KafkaProducer.send
> {code}
> Internally we have applied a patch to remove this bottleneck. The patch does 
> the following:
> 1. replace HashSet with a concurrent hash set
> 2. remove sync from containsTopic and fetch
> 3. pass a replica of topics to getClusterForCurrentTopics since this 
> synchronized method access topics at two locations and topics being hanged in 
> the middle might mess with the semantics.
> Any interest in applying this patch? Any alternative suggestions?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to