[ 
https://issues.apache.org/jira/browse/KAFKA-6431?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ying Zheng updated KAFKA-6431:
------------------------------
    Description: 
Purgatory is the data structure in Kafka broker that manages delayed 
operations. There is a ConcurrentHashMap (Kafka Pool) maps each operation key 
to the operations (in a ConcurrentLinkedQueue) that are interested in the key.

When an operation is done or expired, it's removed from the list 
(ConcurrentLinkedQueue). When the list is empty, it's removed from the 
ConcurrentHashMap. The 2nd operation has to be protected by a lock, to avoid 
adding new operations into a list that is being removed. This is currently done 
by a globally shared ReentrantReadWriteLock. All the read operations on 
purgatory have to acquire the read permission of this lock. The list removing 
operations needs the write permission of this lock.

Our profiling result shows that Kafka broker is spending a nontrivial amount of 
time on this read write lock.

The problem is exacerbated when there are a large amount of short operations. 
For example, when we are doing sync produce operations (acks=all), a 
DelayedProduce operation is added and then removed for each message. If the QPS 
of the topic is not high, it's very likely that, when the operation is done and 
removed, the list of that key (topic partitions) also becomes empty, and has to 
be removed when holding the write lock. This operation blocks all the read / 
write operations on entire purgatory for awhile. As there are tens of IO 
threads accessing purgatory concurrently, this shared lock can easily become a 
bottleneck. 

Actually, we only want to avoid concurrent read / write on the same key. The 
operations on different keys do not conflict with each other.

I suggest to shard purgatory into smaller partitions, and lock each individual 
partition independently.

Assuming there are 10 io threads actively accessing purgatory, sharding 
purgatory into 512 partitions will make the probability for 2 or more threads 
accessing the same partition at the same time to be about 2%. We can also use 
ReentrantLock instead of ReentrantReadWriteLock. When the read operations are 
not much more than write operations, ReentrantLock has lower overhead than 
ReentrantReadWriteLock.





  was:
Purgatory is the data structure in Kafka broker that manages delayed 
operations. There is a ConcurrentHashMap (Kafka Pool) maps each operation key 
to the operations (in a ConcurrentLinkedQueue) that are interested in the key.

When an operation is done or expired, it's removed from the list 
(ConcurrentLinkedQueue). When the list is empty, it's removed from the 
ConcurrentHashMap. The 2nd operation has to be protected by a lock, to avoid 
adding new operations into a list that is being removed. This is currently done 
by a globally shared ReentrantReadWriteLock. All the read operations on 
purgatory have to acquire the read permission of this lock. The list removing 
operations needs the write permission of this lock.

Our profiling result shows that Kafka broker is spending a nontrivial time on 
this read write lock.

The problem is exacerbated when there are a large amount of short operations. 
For example, when we are doing sync produce operations (acks=all), a 
DelayedProduce operation is added and then removed for each message. If the QPS 
of the topic is not high, it's very likely that, when the operation is done and 
removed, the list of that key (topic partitions) becomes empty, and has to be 
removed when holding the write lock. This operation blocks all the read / write 
operations on purgatory for awhile. As there are tens of IO threads accessing 
purgatory concurrently, this shared lock can easily become a bottleneck. 

Actually, we only want to avoid concurrent read / write on the same key. The 
operations on different keys do not conflict with each other.

I suggest to shard purgatory into smaller partitions, and lock each individual 
partition independently.

Assuming there are 10 io threads actively accessing purgatory, sharding 
purgatory into 512 partitions will make the probability for 2 threads accessing 
the same partition at the same time to about 2%. We also can use ReentrantLock 
instead of ReentrantReadWriteLock. When the read operations are not much more 
than write operations, ReentrantLock has lower overhead than 
ReentrantReadWriteLock.






> Lock contention in Purgatory
> ----------------------------
>
>                 Key: KAFKA-6431
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6431
>             Project: Kafka
>          Issue Type: Improvement
>          Components: core
>            Reporter: Ying Zheng
>            Priority: Minor
>
> Purgatory is the data structure in Kafka broker that manages delayed 
> operations. There is a ConcurrentHashMap (Kafka Pool) maps each operation key 
> to the operations (in a ConcurrentLinkedQueue) that are interested in the key.
> When an operation is done or expired, it's removed from the list 
> (ConcurrentLinkedQueue). When the list is empty, it's removed from the 
> ConcurrentHashMap. The 2nd operation has to be protected by a lock, to avoid 
> adding new operations into a list that is being removed. This is currently 
> done by a globally shared ReentrantReadWriteLock. All the read operations on 
> purgatory have to acquire the read permission of this lock. The list removing 
> operations needs the write permission of this lock.
> Our profiling result shows that Kafka broker is spending a nontrivial amount 
> of time on this read write lock.
> The problem is exacerbated when there are a large amount of short operations. 
> For example, when we are doing sync produce operations (acks=all), a 
> DelayedProduce operation is added and then removed for each message. If the 
> QPS of the topic is not high, it's very likely that, when the operation is 
> done and removed, the list of that key (topic partitions) also becomes empty, 
> and has to be removed when holding the write lock. This operation blocks all 
> the read / write operations on entire purgatory for awhile. As there are tens 
> of IO threads accessing purgatory concurrently, this shared lock can easily 
> become a bottleneck. 
> Actually, we only want to avoid concurrent read / write on the same key. The 
> operations on different keys do not conflict with each other.
> I suggest to shard purgatory into smaller partitions, and lock each 
> individual partition independently.
> Assuming there are 10 io threads actively accessing purgatory, sharding 
> purgatory into 512 partitions will make the probability for 2 or more threads 
> accessing the same partition at the same time to be about 2%. We can also use 
> ReentrantLock instead of ReentrantReadWriteLock. When the read operations are 
> not much more than write operations, ReentrantLock has lower overhead than 
> ReentrantReadWriteLock.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to