[ https://issues.apache.org/jira/browse/KAFKA-3428?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15205730#comment-15205730 ]
ASF GitHub Bot commented on KAFKA-3428: --------------------------------------- GitHub user maysamyabandeh opened a pull request: https://github.com/apache/kafka/pull/1111 KAFKA-3428 Remove metadata sync bottleneck from mirrormaker's producer Repalce topics with a concurrent hashset so it would not require to be modified inside a synchrnoized method. Make cluster a volatile varialble so fetch which just returns the pointer does not have to be synchrnized. @ijuma @becketqin the contribution is my original work and that i license the work to the project under the project's open source license. You can merge this pull request into a Git repository by running: $ git pull https://github.com/maysamyabandeh/kafka KAFKA-3428 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/kafka/pull/1111.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1111 ---- commit 795b758977915b5a9098c7722f61b8ca5d318577 Author: Maysam Yabandeh <myaban...@dropbox.com> Date: 2016-03-22T03:31:23Z Repalce topics with a concurrent hashset so it would not require to be modified inside a synchrnoized method. Make cluster a volatile varialble so fetch which just returns the pointer does not have to be synchrnized. ---- > Remove metadata sync bottleneck from mirrormaker's producer > ----------------------------------------------------------- > > Key: KAFKA-3428 > URL: https://issues.apache.org/jira/browse/KAFKA-3428 > Project: Kafka > Issue Type: Improvement > Reporter: Maysam Yabandeh > > Due to sync on the single producer, MM in a setup with 32 consumer threads > could not send more than > 358k msg/sec hence not being able to saturate the NIC. Profiling showed the > producer.send takes 0.080 ms in average, which explains the bottleneck of > 358k msg/sec. The following explains the bottleneck in producer.send and > suggests how to improve it. > Current impl of MM relies on a single reducer. For EACH message, the > producer.send() calls waitOnMetadata which runs the following synchronized > method > {code} > // add topic to metadata topic list if it is not there already. > if (!this.metadata.containsTopic(topic)) > this.metadata.add(topic); > {code} > Although the code is mostly noop, since containsTopic is synchronized it > becomes the bottleneck in MM. > Profiling highlights this bottleneck: > {code} > 100.0% - 65,539 ms kafka.tools.MirrorMaker$MirrorMakerThread.run > 18.9% - 12,403 ms org.apache.kafka.clients.producer.KafkaProducer.send > 13.8% - 9,056 ms > org.apache.kafka.clients.producer.KafkaProducer.waitOnMetadata > 12.1% - 7,933 ms org.apache.kafka.clients.Metadata.containsTopic > 1.7% - 1,088 ms org.apache.kafka.clients.Metadata.fetch > 2.6% - 1,729 ms org.apache.kafka.clients.Metadata.fetch > 2.2% - 1,442 ms > org.apache.kafka.clients.producer.internals.RecordAccumulator.append > {code} > After replacing this bottleneck with a kind of noop, another run of the > profiler shows that fetch is the next bottleneck: > {code} > org.xerial.snappy.SnappyNative.arrayCopy 132 s (54 %) n/a n/a > java.lang.Thread.run 50,776 ms (21 %) n/a n/a > org.apache.kafka.clients.Metadata.fetch 20,881 ms (8 %) n/a > n/a > 6.8% - 16,546 ms > org.apache.kafka.clients.producer.KafkaProducer.waitOnMetadata > 6.8% - 16,546 ms org.apache.kafka.clients.producer.KafkaProducer.send > 6.8% - 16,546 ms kafka.tools.MirrorMaker$MirrorMakerProducer.send > {code} > however the fetch method does not need to be syncronized > {code} > public synchronized Cluster fetch() { > return this.cluster; > } > {code} > removing sync from the fetch method shows that bottleneck is disappeared: > {code} > org.xerial.snappy.SnappyNative.arrayCopy 249 s (78 %) n/a n/a > org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel > 24,489 ms (7 %) n/a n/a > org.xerial.snappy.SnappyNative.rawUncompress 17,024 ms (5 %) > n/a n/a > org.apache.kafka.clients.producer.internals.RecordAccumulator.append > 13,817 ms (4 %) n/a n/a > 4.3% - 13,817 ms org.apache.kafka.clients.producer.KafkaProducer.send > {code} > Internally we have applied a patch to remove this bottleneck. The patch does > the following: > 1. replace HashSet with a concurrent hash set > 2. remove sync from containsTopic and fetch > 3. pass a replica of topics to getClusterForCurrentTopics since this > synchronized method access topics at two locations and topics being hanged in > the middle might mess with the semantics. > Any interest in applying this patch? Any alternative suggestions? -- This message was sent by Atlassian JIRA (v6.3.4#6332)