[ https://issues.apache.org/jira/browse/KAFKA-3428?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Ismael Juma updated KAFKA-3428: ------------------------------- Fix Version/s: (was: 0.10.1.0) 0.10.2.0 > Remove metadata sync bottleneck from mirrormaker's producer > ----------------------------------------------------------- > > Key: KAFKA-3428 > URL: https://issues.apache.org/jira/browse/KAFKA-3428 > Project: Kafka > Issue Type: Improvement > Affects Versions: 0.9.0.1 > Reporter: Maysam Yabandeh > Fix For: 0.10.2.0 > > > Due to sync on the single producer, MM in a setup with 32 consumer threads > could not send more than > 358k msg/sec hence not being able to saturate the NIC. Profiling showed the > producer.send takes 0.080 ms in average, which explains the bottleneck of > 358k msg/sec. The following explains the bottleneck in producer.send and > suggests how to improve it. > Current impl of MM relies on a single reducer. For EACH message, the > producer.send() calls waitOnMetadata which runs the following synchronized > method > {code} > // add topic to metadata topic list if it is not there already. > if (!this.metadata.containsTopic(topic)) > this.metadata.add(topic); > {code} > Although the code is mostly noop, since containsTopic is synchronized it > becomes the bottleneck in MM. > Profiling highlights this bottleneck: > {code} > 100.0% - 65,539 ms kafka.tools.MirrorMaker$MirrorMakerThread.run > 18.9% - 12,403 ms org.apache.kafka.clients.producer.KafkaProducer.send > 13.8% - 9,056 ms > org.apache.kafka.clients.producer.KafkaProducer.waitOnMetadata > 12.1% - 7,933 ms org.apache.kafka.clients.Metadata.containsTopic > 1.7% - 1,088 ms org.apache.kafka.clients.Metadata.fetch > 2.6% - 1,729 ms org.apache.kafka.clients.Metadata.fetch > 2.2% - 1,442 ms > org.apache.kafka.clients.producer.internals.RecordAccumulator.append > {code} > After replacing this bottleneck with a kind of noop, another run of the > profiler shows that fetch is the next bottleneck: > {code} > org.xerial.snappy.SnappyNative.arrayCopy 132 s (54 %) n/a n/a > java.lang.Thread.run 50,776 ms (21 %) n/a n/a > org.apache.kafka.clients.Metadata.fetch 20,881 ms (8 %) n/a > n/a > 6.8% - 16,546 ms > org.apache.kafka.clients.producer.KafkaProducer.waitOnMetadata > 6.8% - 16,546 ms org.apache.kafka.clients.producer.KafkaProducer.send > 6.8% - 16,546 ms kafka.tools.MirrorMaker$MirrorMakerProducer.send > {code} > however the fetch method does not need to be syncronized > {code} > public synchronized Cluster fetch() { > return this.cluster; > } > {code} > removing sync from the fetch method shows that bottleneck is disappeared: > {code} > org.xerial.snappy.SnappyNative.arrayCopy 249 s (78 %) n/a n/a > org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel > 24,489 ms (7 %) n/a n/a > org.xerial.snappy.SnappyNative.rawUncompress 17,024 ms (5 %) > n/a n/a > org.apache.kafka.clients.producer.internals.RecordAccumulator.append > 13,817 ms (4 %) n/a n/a > 4.3% - 13,817 ms org.apache.kafka.clients.producer.KafkaProducer.send > {code} > Internally we have applied a patch to remove this bottleneck. The patch does > the following: > 1. replace HashSet with a concurrent hash set > 2. remove sync from containsTopic and fetch > 3. pass a replica of topics to getClusterForCurrentTopics since this > synchronized method access topics at two locations and topics being hanged in > the middle might mess with the semantics. > Any interest in applying this patch? Any alternative suggestions? -- This message was sent by Atlassian JIRA (v6.3.4#6332)