[
https://issues.apache.org/jira/browse/KAFKA-6622?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16393695#comment-16393695
]
ASF GitHub Bot commented on KAFKA-6622:
---------------------------------------
hachikuji closed pull request #4661: KAFKA-6622 - fix performance issue in
parsing consumer offsets
URL: https://github.com/apache/kafka/pull/4661
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
index 3b79544a502..63af1cb0ce9 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
@@ -555,8 +555,11 @@ class GroupMetadataManager(brokerId: Int,
}
pendingOffsets.remove(batch.producerId)
} else {
+ var batchBaseOffset: Option[Long] = None
for (record <- batch.asScala) {
require(record.hasKey, "Group metadata/offset entry key should
not be null")
+ if (batchBaseOffset.isEmpty)
+ batchBaseOffset = Some(record.offset)
GroupMetadataManager.readMessageKey(record.key) match {
case offsetKey: OffsetKey =>
@@ -573,9 +576,9 @@ class GroupMetadataManager(brokerId: Int,
} else {
val offsetAndMetadata =
GroupMetadataManager.readOffsetMessageValue(record.value)
if (isTxnOffsetCommit)
-
pendingOffsets(batch.producerId).put(groupTopicPartition,
CommitRecordMetadataAndOffset(Some(batch.baseOffset), offsetAndMetadata))
+
pendingOffsets(batch.producerId).put(groupTopicPartition,
CommitRecordMetadataAndOffset(batchBaseOffset, offsetAndMetadata))
else
- loadedOffsets.put(groupTopicPartition,
CommitRecordMetadataAndOffset(Some(batch.baseOffset), offsetAndMetadata))
+ loadedOffsets.put(groupTopicPartition,
CommitRecordMetadataAndOffset(batchBaseOffset, offsetAndMetadata))
}
case groupMetadataKey: GroupMetadataKey =>
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> GroupMetadataManager.loadGroupsAndOffsets decompresses record batch needlessly
> ------------------------------------------------------------------------------
>
> Key: KAFKA-6622
> URL: https://issues.apache.org/jira/browse/KAFKA-6622
> Project: Kafka
> Issue Type: Bug
> Reporter: radai rosenblatt
> Assignee: radai rosenblatt
> Priority: Major
> Attachments: kafka batch iteration funtime.png
>
>
> when reading records from a consumer offsets batch, the entire batch is
> decompressed multiple times (per record) as part of calling
> `batch.baseOffset`. this is a very expensive operation being called in a loop
> for no reason:
> !kafka batch iteration funtime.png!
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)