[ 
https://issues.apache.org/jira/browse/KAFKA-14029?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

BugFinder updated KAFKA-14029:
------------------------------
    Priority: Minor  (was: Blocker)

> Consumer response serialization could block other response handlers at scale
> ----------------------------------------------------------------------------
>
>                 Key: KAFKA-14029
>                 URL: https://issues.apache.org/jira/browse/KAFKA-14029
>             Project: Kafka
>          Issue Type: Improvement
>          Components: consumer
>    Affects Versions: 3.2.0
>            Reporter: BugFinder
>            Priority: Minor
>
> Hi,
> We have been using our in-house tools to test Kafka's scalability to have an 
> idea of how a large-scale deployment will work and where are the bottlenecks. 
> For now, we are looking at version 3.2 and focused in a many-consumers 
> scenario.
> Consumer-wise, we want to report a possible issue and eventually propose a 
> solution, aiming to build our expertise in the system. When adding a new 
> consumer to a group, the code path
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle
>  *// (has a synchronized block)*
>  
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onLeaderElected
>    
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onLeaderElected
>     
> org.apache.kafka.clients.consumer.internals.ConsumerProtocol.serializeAssignment
>      
> org.apache.kafka.clients.consumer.internals.ConsumerProtocol.serializeAssignment
>       
> org.apache.kafka.common.protocol.MessageUtil.toVersionPrefixedByteBuffer *// 
> linear on size of message*
> could end up being costly when the size of the message is large. 
> toVersionPrefixedByteBuffer seems to be linear in the size of the message, 
> and albeit writing an array in linear time is not unreasonable at all, 
> {*}under certain conditions, e.g. when under locks{*}, it can cause 
> {*}undesired contention{*}. In this case, its invoked to serialize the 
> assigment when adding a new consumer (on where there is another loop wraping 
> up this path that seems to depend on the number of assignments, which could 
> be another problematic dimension if growing causing undesired nesting), here
> [org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onLeaderElected]
> // ...
> Map<String, ByteBuffer> groupAssignment = new HashMap<>();
>         for (Map.Entry<String, Assignment> assignmentEntry : 
> assignments.entrySet()) {
>             ByteBuffer buffer = 
> *ConsumerProtocol.serializeAssignment(assignmentEntry.getValue()); // calls 
> toVersionPrefixedByteBuffer* 
>             groupAssignment.put(assignmentEntry.getKey(), buffer);
>         }
> // ...
> The question here is, {*}is there a need to serialize the assignment inside 
> the synchronized block{*}? if that assignment is too large, it could easily 
> add a few seconds to the request and block others that use the same lock, 
> like HeartbeatResponseHandler.
>  



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

Reply via email to