This is an automated email from the ASF dual-hosted git repository. jgus pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 1cabef0 MINOR: Refactor GroupMetadataManager cleanupGroupMetadata (#4504) 1cabef0 is described below commit 1cabef0d3dc7a3c245f260b8d34a60d7d044bb9c Author: Vahid Hashemian <vahidhashem...@us.ibm.com> AuthorDate: Thu Feb 22 08:19:13 2018 -0800 MINOR: Refactor GroupMetadataManager cleanupGroupMetadata (#4504) Refactoring avoids the need to call this method with a infinity as current time to remove all group offsets (when manually deleting the group). --- .../kafka/coordinator/group/GroupCoordinator.scala | 11 ++++-- .../kafka/coordinator/group/GroupMetadata.scala | 3 +- .../coordinator/group/GroupMetadataManager.scala | 39 ++++++++++++---------- 3 files changed, 32 insertions(+), 21 deletions(-) diff --git a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala index 5ae8552..4e605e2 100644 --- a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala +++ b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala @@ -375,9 +375,11 @@ class GroupCoordinator(val brokerId: Int, } if (eligibleGroups.nonEmpty) { - groupManager.cleanupGroupMetadata(None, eligibleGroups, Long.MaxValue) + val offsetsRemoved = groupManager.cleanupGroupMetadata(eligibleGroups, group => { + group.removeAllOffsets() + }) groupErrors ++= eligibleGroups.map(_.groupId -> Errors.NONE).toMap - info(s"The following groups were deleted: ${eligibleGroups.map(_.groupId).mkString(", ")}") + info(s"The following groups were deleted: ${eligibleGroups.map(_.groupId).mkString(", ")}. A total of $offsetsRemoved offsets were removed.") } groupErrors @@ -568,7 +570,10 @@ class GroupCoordinator(val brokerId: Int, } def handleDeletedPartitions(topicPartitions: Seq[TopicPartition]) { - groupManager.cleanupGroupMetadata(Some(topicPartitions), groupManager.currentGroups, time.milliseconds()) + val offsetsRemoved = groupManager.cleanupGroupMetadata(groupManager.currentGroups, group => { + group.removeOffsets(topicPartitions) + }) + info(s"Removed $offsetsRemoved offsets associated with deleted partitions: ${topicPartitions.mkString(", ")}.") } private def validateGroup(groupId: String): Option[Errors] = { diff --git a/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala b/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala index 07d14f4..2b9c91f 100644 --- a/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala +++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala @@ -421,9 +421,10 @@ private[group] class GroupMetadata(val groupId: String, initialState: GroupState def hasPendingOffsetCommitsFromProducer(producerId: Long) = pendingTransactionalOffsetCommits.contains(producerId) + def removeAllOffsets(): immutable.Map[TopicPartition, OffsetAndMetadata] = removeOffsets(offsets.keySet.toSeq) + def removeOffsets(topicPartitions: Seq[TopicPartition]): immutable.Map[TopicPartition, OffsetAndMetadata] = { topicPartitions.flatMap { topicPartition => - pendingOffsetCommits.remove(topicPartition) pendingTransactionalOffsetCommits.foreach { case (_, pendingOffsets) => pendingOffsets.remove(topicPartition) diff --git a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala index 3391fc3..3b79544 100644 --- a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala +++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala @@ -713,22 +713,27 @@ class GroupMetadataManager(brokerId: Int, // visible for testing private[group] def cleanupGroupMetadata(): Unit = { - cleanupGroupMetadata(None, groupMetadataCache.values, time.milliseconds()) + val startMs = time.milliseconds() + val offsetsRemoved = cleanupGroupMetadata(groupMetadataCache.values, group => { + group.removeExpiredOffsets(time.milliseconds()) + }) + info(s"Removed $offsetsRemoved expired offsets in ${time.milliseconds() - startMs} milliseconds.") } - def cleanupGroupMetadata(deletedTopicPartitions: Option[Seq[TopicPartition]], - groups: Iterable[GroupMetadata], - startMs: Long) { + /** + * This function is used to clean up group offsets given the groups and also a function that performs the offset deletion. + * @param groups Groups whose metadata are to be cleaned up + * @param selector A function that implements deletion of (all or part of) group offsets. This function is called while + * a group lock is held, therefore there is no need for the caller to also obtain a group lock. + * @return The cumulative number of offsets removed + */ + def cleanupGroupMetadata(groups: Iterable[GroupMetadata], selector: GroupMetadata => Map[TopicPartition, OffsetAndMetadata]): Int = { var offsetsRemoved = 0 groups.foreach { group => val groupId = group.groupId val (removedOffsets, groupIsDead, generation) = group.inLock { - val removedOffsets = deletedTopicPartitions match { - case Some(topicPartitions) => group.removeOffsets(topicPartitions) - case None => group.removeExpiredOffsets(startMs) - } - + val removedOffsets = selector(group) if (group.is(Empty) && !group.hasOffsets) { info(s"Group $groupId transitioned to Dead in generation ${group.generationId}") group.transitionTo(Dead) @@ -736,13 +741,13 @@ class GroupMetadataManager(brokerId: Int, (removedOffsets, group.is(Dead), group.generationId) } - val offsetsPartition = partitionFor(groupId) - val appendPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, offsetsPartition) - getMagic(offsetsPartition) match { - case Some(magicValue) => - // We always use CREATE_TIME, like the producer. The conversion to LOG_APPEND_TIME (if necessary) happens automatically. - val timestampType = TimestampType.CREATE_TIME - val timestamp = time.milliseconds() + val offsetsPartition = partitionFor(groupId) + val appendPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, offsetsPartition) + getMagic(offsetsPartition) match { + case Some(magicValue) => + // We always use CREATE_TIME, like the producer. The conversion to LOG_APPEND_TIME (if necessary) happens automatically. + val timestampType = TimestampType.CREATE_TIME + val timestamp = time.milliseconds() replicaManager.nonOfflinePartition(appendPartition).foreach { partition => val tombstones = ListBuffer.empty[SimpleRecord] @@ -788,7 +793,7 @@ class GroupMetadataManager(brokerId: Int, } } - info(s"Removed $offsetsRemoved expired offsets in ${time.milliseconds() - startMs} milliseconds.") + offsetsRemoved } def handleTxnCompletion(producerId: Long, completedPartitions: Set[Int], isCommit: Boolean) { -- To stop receiving notification emails like this one, please contact j...@apache.org.