Ryan Cabral created KAFKA-12838:
-----------------------------------

             Summary: Kafka Broker - Request threads inefficiently blocking 
during produce
                 Key: KAFKA-12838
                 URL: https://issues.apache.org/jira/browse/KAFKA-12838
             Project: Kafka
          Issue Type: Improvement
          Components: core
    Affects Versions: 2.8.0, 2.7.0
            Reporter: Ryan Cabral


Hello, I have been using Kafka brokers for a bit and have run into a problem 
with the way a kafka broker handles produce requests. If there are multiple 
producers to the same topic and partition, any request handler threads handling 
the produce for that topic and partition become blocked until all requests 
before it are done. Request handler threads for the entire broker can become 
exhausted waiting on the same partition lock, blocking requests for other 
partitions that would not have needed the same lock.

Once that starts happening, requests start to back up, queued requests can 
reach its maximum and network threads begin to be paused cascading the problem 
a bit more. Overall performance ends up being degraded. I'm not so focused on 
the cascade at the moment as I am the initial contention. Intuitively I would 
expect locking contention on a single partition to ONLY affect throughput on 
that partition and not the entire broker.

 

The append call within the request handler originates here:

[https://github.com/apache/kafka/blob/2.8.0/core/src/main/scala/kafka/server/KafkaApis.scala#L638]

Further down the stack the lock during append is created here: 
[https://github.com/apache/kafka/blob/2.8.0/core/src/main/scala/kafka/log/Log.scala#L1165]

At this point the first request will hold the lock during append and future 
requests on the same partition will block, waiting for the lock, tying up an io 
thread (request handler).

At first glance, it seems like it would make the most sense to (via config?) be 
able to funnel (produce) requests for the same partition through its own 
request queue of sorts and dispatch them such that at most one io thread is 
tied up at a time for a given partition. There are a number of reasons the lock 
could be held elsewhere too but this should at least help mitigate the issue a 
bit. I'm assuming this is easier said than done though and likely requires 
significant refactoring to properly achieve but hoping this is something that 
could end up on some sort of long term roadmap.

 

Snippet from jstack. Almost all request handlers threads (there are 256 of 
them, up from 25 to mitigate the issue) in the jstack are blocked waiting on 
the same lock due to the number of producers we have.

 
{noformat}
"data-plane-kafka-request-handler-254" #335 daemon prio=5 os_prio=0 
tid=0x00007fb1c9f13000 nid=0x53f1 runnable [0x00007fad35796000]
   java.lang.Thread.State: RUNNABLE
        at 
org.apache.kafka.common.record.KafkaLZ4BlockOutputStream.<init>(KafkaLZ4BlockOutputStream.java:82)
        at 
org.apache.kafka.common.record.KafkaLZ4BlockOutputStream.<init>(KafkaLZ4BlockOutputStream.java:125)
        at 
org.apache.kafka.common.record.CompressionType$4.wrapForOutput(CompressionType.java:101)
        at 
org.apache.kafka.common.record.MemoryRecordsBuilder.<init>(MemoryRecordsBuilder.java:134)
        at 
org.apache.kafka.common.record.MemoryRecordsBuilder.<init>(MemoryRecordsBuilder.java:170)
        at 
org.apache.kafka.common.record.MemoryRecords.builder(MemoryRecords.java:508)
        at 
kafka.log.LogValidator$.buildRecordsAndAssignOffsets(LogValidator.scala:500)
        at 
kafka.log.LogValidator$.validateMessagesAndAssignOffsetsCompressed(LogValidator.scala:455)
        at 
kafka.log.LogValidator$.validateMessagesAndAssignOffsets(LogValidator.scala:106)
        at kafka.log.Log.$anonfun$append$2(Log.scala:1126)
        - locked <0x00000004c9a6fd60> (a java.lang.Object)
        at kafka.log.Log.append(Log.scala:2387)
        at kafka.log.Log.appendAsLeader(Log.scala:1050)
        at 
kafka.cluster.Partition.$anonfun$appendRecordsToLeader$1(Partition.scala:1079)
        at kafka.cluster.Partition.appendRecordsToLeader(Partition.scala:1067)
        at 
kafka.server.ReplicaManager.$anonfun$appendToLocalLog$4(ReplicaManager.scala:953)
        at kafka.server.ReplicaManager$$Lambda$1078/1017241486.apply(Unknown 
Source)
        at 
scala.collection.StrictOptimizedMapOps.map(StrictOptimizedMapOps.scala:28)
        at 
scala.collection.StrictOptimizedMapOps.map$(StrictOptimizedMapOps.scala:27)
        at scala.collection.mutable.HashMap.map(HashMap.scala:35)
        at 
kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:941)
        at kafka.server.ReplicaManager.appendRecords(ReplicaManager.scala:621)
        at kafka.server.KafkaApis.handleProduceRequest(KafkaApis.scala:625)
        at kafka.server.KafkaApis.handle(KafkaApis.scala:137)
        at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:74)
        at java.lang.Thread.run(Thread.java:748)


"data-plane-kafka-request-handler-253" #334 daemon prio=5 os_prio=0 
tid=0x00007fb1c9f11000 nid=0x53f0 waiting for monitor entry [0x00007fad35897000]
   java.lang.Thread.State: BLOCKED (on object monitor)
        at kafka.log.Log.$anonfun$append$2(Log.scala:1104)
        - waiting to lock <0x00000004c9a6fd60> (a java.lang.Object)
        at kafka.log.Log.append(Log.scala:2387)
        at kafka.log.Log.appendAsLeader(Log.scala:1050)
        at 
kafka.cluster.Partition.$anonfun$appendRecordsToLeader$1(Partition.scala:1079)
        at kafka.cluster.Partition.appendRecordsToLeader(Partition.scala:1067)
        at 
kafka.server.ReplicaManager.$anonfun$appendToLocalLog$4(ReplicaManager.scala:953)
        at kafka.server.ReplicaManager$$Lambda$1078/1017241486.apply(Unknown 
Source)
        at 
scala.collection.StrictOptimizedMapOps.map(StrictOptimizedMapOps.scala:28)
        at 
scala.collection.StrictOptimizedMapOps.map$(StrictOptimizedMapOps.scala:27)
        at scala.collection.mutable.HashMap.map(HashMap.scala:35)
        at 
kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:941)
        at kafka.server.ReplicaManager.appendRecords(ReplicaManager.scala:621)
        at kafka.server.KafkaApis.handleProduceRequest(KafkaApis.scala:625)
        at kafka.server.KafkaApis.handle(KafkaApis.scala:137)
        at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:74)
        at java.lang.Thread.run(Thread.java:748){noformat}
 

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to