This is an automated email from the ASF dual-hosted git repository.
manikumar pushed a commit to branch 2.4
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.4 by this push:
new b7cab2d MINOR: Correctly mark offset expiry in GroupMetadataManager's
OffsetExpired metric
b7cab2d is described below
commit b7cab2d66f017464d3242525643a2afe2c28e924
Author: Stanislav Kozlovski <[email protected]>
AuthorDate: Fri Nov 1 14:18:48 2019 +0530
MINOR: Correctly mark offset expiry in GroupMetadataManager's OffsetExpired
metric
We would mistakenly increment the `OffsetCommits` metric instead
Author: Stanislav Kozlovski <[email protected]>
Reviewers: David Jacot <[email protected]>, Manikumar Reddy
<[email protected]>
Closes #7624 from
stanislavkozlovski/minor-fix-group-coordinator-offset-expiry-metric
(cherry picked from commit 72282ed1988e33000f91672a72c7356e26658241)
Signed-off-by: Manikumar Reddy <[email protected]>
---
.../scala/kafka/coordinator/group/GroupCoordinator.scala | 14 +++++++-------
.../kafka/coordinator/group/GroupMetadataManager.scala | 2 +-
2 files changed, 8 insertions(+), 8 deletions(-)
diff --git a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
index 22f15f9..24a1780 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
@@ -545,7 +545,7 @@ class GroupCoordinator(val brokerId: Int,
def handleDeleteOffsets(groupId: String, partitions: Seq[TopicPartition]):
(Errors, Map[TopicPartition, Errors]) = {
var groupError: Errors = Errors.NONE
var partitionErrors: Map[TopicPartition, Errors] = Map()
- var partitionEligibleForDeletion: Seq[TopicPartition] = Seq()
+ var partitionsEligibleForDeletion: Seq[TopicPartition] = Seq()
validateGroupStatus(groupId, ApiKeys.OFFSET_DELETE) match {
case Some(error) =>
@@ -565,13 +565,13 @@ class GroupCoordinator(val brokerId: Int,
Errors.GROUP_ID_NOT_FOUND else Errors.NOT_COORDINATOR
case Empty =>
- partitionEligibleForDeletion = partitions
+ partitionsEligibleForDeletion = partitions
case PreparingRebalance | CompletingRebalance | Stable if
group.isConsumerGroup =>
val (consumed, notConsumed) =
partitions.partition(tp =>
group.isSubscribedToTopic(tp.topic()))
- partitionEligibleForDeletion = notConsumed
+ partitionsEligibleForDeletion = notConsumed
partitionErrors = consumed.map(_ ->
Errors.GROUP_SUBSCRIBED_TO_TOPIC).toMap
case _ =>
@@ -579,16 +579,16 @@ class GroupCoordinator(val brokerId: Int,
}
}
- if (partitionEligibleForDeletion.nonEmpty) {
+ if (partitionsEligibleForDeletion.nonEmpty) {
val offsetsRemoved =
groupManager.cleanupGroupMetadata(Seq(group), group => {
- group.removeOffsets(partitionEligibleForDeletion)
+ group.removeOffsets(partitionsEligibleForDeletion)
})
- partitionErrors ++= partitionEligibleForDeletion.map(_ ->
Errors.NONE).toMap
+ partitionErrors ++= partitionsEligibleForDeletion.map(_ ->
Errors.NONE).toMap
offsetDeletionSensor.record(offsetsRemoved)
- info(s"The following offsets of the group $groupId were deleted:
${partitionEligibleForDeletion.mkString(", ")}. " +
+ info(s"The following offsets of the group $groupId were deleted:
${partitionsEligibleForDeletion.mkString(", ")}. " +
s"A total of $offsetsRemoved offsets were removed.")
}
}
diff --git
a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
index 2cbf7c8..a143669 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
@@ -766,7 +766,7 @@ class GroupMetadataManager(brokerId: Int,
val numOffsetsRemoved = cleanupGroupMetadata(groupMetadataCache.values,
group => {
group.removeExpiredOffsets(currentTimestamp, config.offsetsRetentionMs)
})
- offsetCommitsSensor.record(numOffsetsRemoved)
+ offsetExpiredSensor.record(numOffsetsRemoved)
info(s"Removed $numOffsetsRemoved expired offsets in ${time.milliseconds()
- currentTimestamp} milliseconds.")
}