[
https://issues.apache.org/jira/browse/KAFKA-14029?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
BugFinder updated KAFKA-14029:
------------------------------
Priority: Minor (was: Blocker)
> Consumer response serialization could block other response handlers at scale
> ----------------------------------------------------------------------------
>
> Key: KAFKA-14029
> URL: https://issues.apache.org/jira/browse/KAFKA-14029
> Project: Kafka
> Issue Type: Improvement
> Components: consumer
> Affects Versions: 3.2.0
> Reporter: BugFinder
> Priority: Minor
>
> Hi,
> We have been using our in-house tools to test Kafka's scalability to have an
> idea of how a large-scale deployment will work and where are the bottlenecks.
> For now, we are looking at version 3.2 and focused in a many-consumers
> scenario.
> Consumer-wise, we want to report a possible issue and eventually propose a
> solution, aiming to build our expertise in the system. When adding a new
> consumer to a group, the code path
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle
> *// (has a synchronized block)*
>
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onLeaderElected
>
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onLeaderElected
>
> org.apache.kafka.clients.consumer.internals.ConsumerProtocol.serializeAssignment
>
> org.apache.kafka.clients.consumer.internals.ConsumerProtocol.serializeAssignment
>
> org.apache.kafka.common.protocol.MessageUtil.toVersionPrefixedByteBuffer *//
> linear on size of message*
> could end up being costly when the size of the message is large.
> toVersionPrefixedByteBuffer seems to be linear in the size of the message,
> and albeit writing an array in linear time is not unreasonable at all,
> {*}under certain conditions, e.g. when under locks{*}, it can cause
> {*}undesired contention{*}. In this case, its invoked to serialize the
> assigment when adding a new consumer (on where there is another loop wraping
> up this path that seems to depend on the number of assignments, which could
> be another problematic dimension if growing causing undesired nesting), here
> [org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onLeaderElected]
> // ...
> Map<String, ByteBuffer> groupAssignment = new HashMap<>();
> for (Map.Entry<String, Assignment> assignmentEntry :
> assignments.entrySet()) {
> ByteBuffer buffer =
> *ConsumerProtocol.serializeAssignment(assignmentEntry.getValue()); // calls
> toVersionPrefixedByteBuffer*
> groupAssignment.put(assignmentEntry.getKey(), buffer);
> }
> // ...
> The question here is, {*}is there a need to serialize the assignment inside
> the synchronized block{*}? if that assignment is too large, it could easily
> add a few seconds to the request and block others that use the same lock,
> like HeartbeatResponseHandler.
>
--
This message was sent by Atlassian Jira
(v8.20.7#820007)