[ 
https://issues.apache.org/jira/browse/KAFKA-6622?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16393695#comment-16393695
 ] 

ASF GitHub Bot commented on KAFKA-6622:
---------------------------------------

hachikuji closed pull request #4661: KAFKA-6622 - fix performance issue in 
parsing consumer offsets
URL: https://github.com/apache/kafka/pull/4661
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala 
b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
index 3b79544a502..63af1cb0ce9 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
@@ -555,8 +555,11 @@ class GroupMetadataManager(brokerId: Int,
               }
               pendingOffsets.remove(batch.producerId)
             } else {
+              var batchBaseOffset: Option[Long] = None
               for (record <- batch.asScala) {
                 require(record.hasKey, "Group metadata/offset entry key should 
not be null")
+                if (batchBaseOffset.isEmpty)
+                  batchBaseOffset = Some(record.offset)
                 GroupMetadataManager.readMessageKey(record.key) match {
 
                   case offsetKey: OffsetKey =>
@@ -573,9 +576,9 @@ class GroupMetadataManager(brokerId: Int,
                     } else {
                       val offsetAndMetadata = 
GroupMetadataManager.readOffsetMessageValue(record.value)
                       if (isTxnOffsetCommit)
-                        
pendingOffsets(batch.producerId).put(groupTopicPartition, 
CommitRecordMetadataAndOffset(Some(batch.baseOffset), offsetAndMetadata))
+                        
pendingOffsets(batch.producerId).put(groupTopicPartition, 
CommitRecordMetadataAndOffset(batchBaseOffset, offsetAndMetadata))
                       else
-                        loadedOffsets.put(groupTopicPartition, 
CommitRecordMetadataAndOffset(Some(batch.baseOffset), offsetAndMetadata))
+                        loadedOffsets.put(groupTopicPartition, 
CommitRecordMetadataAndOffset(batchBaseOffset, offsetAndMetadata))
                     }
 
                   case groupMetadataKey: GroupMetadataKey =>


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> GroupMetadataManager.loadGroupsAndOffsets decompresses record batch needlessly
> ------------------------------------------------------------------------------
>
>                 Key: KAFKA-6622
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6622
>             Project: Kafka
>          Issue Type: Bug
>            Reporter: radai rosenblatt
>            Assignee: radai rosenblatt
>            Priority: Major
>         Attachments: kafka batch iteration funtime.png
>
>
> when reading records from a consumer offsets batch, the entire batch is 
> decompressed multiple times (per record) as part of calling 
> `batch.baseOffset`. this is a very expensive operation being called in a loop 
> for no reason:
> !kafka batch iteration funtime.png!



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to