This is an automated email from the ASF dual-hosted git repository.

manikumar pushed a commit to branch 2.4
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.4 by this push:
     new b7cab2d  MINOR: Correctly mark offset expiry in GroupMetadataManager's 
OffsetExpired metric
b7cab2d is described below

commit b7cab2d66f017464d3242525643a2afe2c28e924
Author: Stanislav Kozlovski <[email protected]>
AuthorDate: Fri Nov 1 14:18:48 2019 +0530

    MINOR: Correctly mark offset expiry in GroupMetadataManager's OffsetExpired 
metric
    
    We would mistakenly increment the `OffsetCommits` metric instead
    
    Author: Stanislav Kozlovski <[email protected]>
    
    Reviewers: David Jacot <[email protected]>, Manikumar Reddy 
<[email protected]>
    
    Closes #7624 from 
stanislavkozlovski/minor-fix-group-coordinator-offset-expiry-metric
    
    (cherry picked from commit 72282ed1988e33000f91672a72c7356e26658241)
    Signed-off-by: Manikumar Reddy <[email protected]>
---
 .../scala/kafka/coordinator/group/GroupCoordinator.scala   | 14 +++++++-------
 .../kafka/coordinator/group/GroupMetadataManager.scala     |  2 +-
 2 files changed, 8 insertions(+), 8 deletions(-)

diff --git a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala 
b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
index 22f15f9..24a1780 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
@@ -545,7 +545,7 @@ class GroupCoordinator(val brokerId: Int,
   def handleDeleteOffsets(groupId: String, partitions: Seq[TopicPartition]): 
(Errors, Map[TopicPartition, Errors]) = {
     var groupError: Errors = Errors.NONE
     var partitionErrors: Map[TopicPartition, Errors] = Map()
-    var partitionEligibleForDeletion: Seq[TopicPartition] = Seq()
+    var partitionsEligibleForDeletion: Seq[TopicPartition] = Seq()
 
     validateGroupStatus(groupId, ApiKeys.OFFSET_DELETE) match {
       case Some(error) =>
@@ -565,13 +565,13 @@ class GroupCoordinator(val brokerId: Int,
                     Errors.GROUP_ID_NOT_FOUND else Errors.NOT_COORDINATOR
 
                 case Empty =>
-                  partitionEligibleForDeletion = partitions
+                  partitionsEligibleForDeletion = partitions
 
                 case PreparingRebalance | CompletingRebalance | Stable if 
group.isConsumerGroup =>
                   val (consumed, notConsumed) =
                     partitions.partition(tp => 
group.isSubscribedToTopic(tp.topic()))
 
-                  partitionEligibleForDeletion = notConsumed
+                  partitionsEligibleForDeletion = notConsumed
                   partitionErrors = consumed.map(_ -> 
Errors.GROUP_SUBSCRIBED_TO_TOPIC).toMap
 
                 case _ =>
@@ -579,16 +579,16 @@ class GroupCoordinator(val brokerId: Int,
               }
             }
 
-            if (partitionEligibleForDeletion.nonEmpty) {
+            if (partitionsEligibleForDeletion.nonEmpty) {
               val offsetsRemoved = 
groupManager.cleanupGroupMetadata(Seq(group), group => {
-                group.removeOffsets(partitionEligibleForDeletion)
+                group.removeOffsets(partitionsEligibleForDeletion)
               })
 
-              partitionErrors ++= partitionEligibleForDeletion.map(_ -> 
Errors.NONE).toMap
+              partitionErrors ++= partitionsEligibleForDeletion.map(_ -> 
Errors.NONE).toMap
 
               offsetDeletionSensor.record(offsetsRemoved)
 
-              info(s"The following offsets of the group $groupId were deleted: 
${partitionEligibleForDeletion.mkString(", ")}. " +
+              info(s"The following offsets of the group $groupId were deleted: 
${partitionsEligibleForDeletion.mkString(", ")}. " +
                 s"A total of $offsetsRemoved offsets were removed.")
             }
         }
diff --git 
a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala 
b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
index 2cbf7c8..a143669 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
@@ -766,7 +766,7 @@ class GroupMetadataManager(brokerId: Int,
     val numOffsetsRemoved = cleanupGroupMetadata(groupMetadataCache.values, 
group => {
       group.removeExpiredOffsets(currentTimestamp, config.offsetsRetentionMs)
     })
-    offsetCommitsSensor.record(numOffsetsRemoved)
+    offsetExpiredSensor.record(numOffsetsRemoved)
     info(s"Removed $numOffsetsRemoved expired offsets in ${time.milliseconds() 
- currentTimestamp} milliseconds.")
   }
 

Reply via email to