KAFKA-2720: expire group metadata when all offsets have expired Author: Jason Gustafson <[email protected]>
Reviewers: Liquan Pei, Onur Karaman, Guozhang Wang Closes #1427 from hachikuji/KAFKA-2720 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/8c551675 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/8c551675 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/8c551675 Branch: refs/heads/trunk Commit: 8c551675adb11947e9f27b20a9195c9c4a20b432 Parents: fb42558 Author: Jason Gustafson <[email protected]> Authored: Wed Jun 15 19:46:42 2016 -0700 Committer: Guozhang Wang <[email protected]> Committed: Wed Jun 15 19:46:42 2016 -0700 ---------------------------------------------------------------------- .../scala/kafka/coordinator/DelayedJoin.scala | 4 +- .../kafka/coordinator/GroupCoordinator.scala | 304 +++++------ .../scala/kafka/coordinator/GroupMetadata.scala | 110 +++- .../coordinator/GroupMetadataManager.scala | 498 ++++++++++--------- .../kafka/coordinator/MemberMetadata.scala | 1 + .../GroupCoordinatorResponseTest.scala | 7 +- .../coordinator/GroupMetadataManagerTest.scala | 407 +++++++++++++++ .../kafka/coordinator/GroupMetadataTest.scala | 155 +++++- .../kafka/coordinator/MemberMetadataTest.scala | 11 +- 9 files changed, 1081 insertions(+), 416 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/8c551675/core/src/main/scala/kafka/coordinator/DelayedJoin.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/coordinator/DelayedJoin.scala b/core/src/main/scala/kafka/coordinator/DelayedJoin.scala index ae96e15..a62884a 100644 --- a/core/src/main/scala/kafka/coordinator/DelayedJoin.scala +++ b/core/src/main/scala/kafka/coordinator/DelayedJoin.scala @@ -30,8 +30,8 @@ import kafka.server.DelayedOperation * the rest of the group. */ private[coordinator] class DelayedJoin(coordinator: GroupCoordinator, - group: GroupMetadata, - sessionTimeout: Long) + group: GroupMetadata, + sessionTimeout: Long) extends DelayedOperation(sessionTimeout) { override def tryComplete(): Boolean = coordinator.tryCompleteJoin(group, forceComplete) http://git-wip-us.apache.org/repos/asf/kafka/blob/8c551675/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala b/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala index e9bbbd3..9c75f83 100644 --- a/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala +++ b/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala @@ -31,15 +31,6 @@ import org.apache.kafka.common.requests.{OffsetFetchResponse, JoinGroupRequest} import scala.collection.{Map, Seq, immutable} -case class GroupConfig(groupMinSessionTimeoutMs: Int, - groupMaxSessionTimeoutMs: Int) - -case class JoinGroupResult(members: Map[String, Array[Byte]], - memberId: String, - generationId: Int, - subProtocol: String, - leaderId: String, - errorCode: Short) /** * GroupCoordinator handles general group membership and offset management. @@ -77,8 +68,10 @@ class GroupCoordinator(val brokerId: Int, /** * Startup logic executed at the same time when the server starts up. */ - def startup() { + def startup(enableMetadataExpiration: Boolean = true) { info("Starting up.") + if (enableMetadataExpiration) + groupManager.enableMetadataExpiration() isActive.set(true) info("Startup complete.") } @@ -119,16 +112,17 @@ class GroupCoordinator(val brokerId: Int, // only try to create the group if the group is not unknown AND // the member id is UNKNOWN, if member is specified but group does not // exist we should reject the request - var group = groupManager.getGroup(groupId) - if (group == null) { - if (memberId != JoinGroupRequest.UNKNOWN_MEMBER_ID) { - responseCallback(joinError(memberId, Errors.UNKNOWN_MEMBER_ID.code)) - } else { - group = groupManager.addGroup(new GroupMetadata(groupId, protocolType)) + groupManager.getGroup(groupId) match { + case None => + if (memberId != JoinGroupRequest.UNKNOWN_MEMBER_ID) { + responseCallback(joinError(memberId, Errors.UNKNOWN_MEMBER_ID.code)) + } else { + val group = groupManager.addGroup(new GroupMetadata(groupId)) + doJoinGroup(group, memberId, clientId, clientHost, sessionTimeoutMs, protocolType, protocols, responseCallback) + } + + case Some(group) => doJoinGroup(group, memberId, clientId, clientHost, sessionTimeoutMs, protocolType, protocols, responseCallback) - } - } else { - doJoinGroup(group, memberId, clientId, clientHost, sessionTimeoutMs, protocolType, protocols, responseCallback) } } } @@ -142,7 +136,7 @@ class GroupCoordinator(val brokerId: Int, protocols: List[(String, Array[Byte])], responseCallback: JoinCallback) { group synchronized { - if (group.protocolType != protocolType || !group.supportsProtocols(protocols.map(_._1).toSet)) { + if (!group.is(Empty) && (group.protocolType != Some(protocolType) || !group.supportsProtocols(protocols.map(_._1).toSet))) { // if the new member does not support the group protocol, reject it responseCallback(joinError(memberId, Errors.INCONSISTENT_GROUP_PROTOCOL.code)) } else if (memberId != JoinGroupRequest.UNKNOWN_MEMBER_ID && !group.has(memberId)) { @@ -160,7 +154,7 @@ class GroupCoordinator(val brokerId: Int, case PreparingRebalance => if (memberId == JoinGroupRequest.UNKNOWN_MEMBER_ID) { - addMemberAndRebalance(sessionTimeoutMs, clientId, clientHost, protocols, group, responseCallback) + addMemberAndRebalance(sessionTimeoutMs, clientId, clientHost, protocolType, protocols, group, responseCallback) } else { val member = group.get(memberId) updateMemberAndRebalance(group, member, protocols, responseCallback) @@ -168,7 +162,7 @@ class GroupCoordinator(val brokerId: Int, case AwaitingSync => if (memberId == JoinGroupRequest.UNKNOWN_MEMBER_ID) { - addMemberAndRebalance(sessionTimeoutMs, clientId, clientHost, protocols, group, responseCallback) + addMemberAndRebalance(sessionTimeoutMs, clientId, clientHost, protocolType, protocols, group, responseCallback) } else { val member = group.get(memberId) if (member.matches(protocols)) { @@ -192,10 +186,10 @@ class GroupCoordinator(val brokerId: Int, } } - case Stable => + case Empty | Stable => if (memberId == JoinGroupRequest.UNKNOWN_MEMBER_ID) { // if the member id is unknown, register the member to the group - addMemberAndRebalance(sessionTimeoutMs, clientId, clientHost, protocols, group, responseCallback) + addMemberAndRebalance(sessionTimeoutMs, clientId, clientHost, protocolType, protocols, group, responseCallback) } else { val member = group.get(memberId) if (memberId == group.leaderId || !member.matches(protocols)) { @@ -233,11 +227,10 @@ class GroupCoordinator(val brokerId: Int, } else if (!isCoordinatorForGroup(groupId)) { responseCallback(Array.empty, Errors.NOT_COORDINATOR_FOR_GROUP.code) } else { - val group = groupManager.getGroup(groupId) - if (group == null) - responseCallback(Array.empty, Errors.UNKNOWN_MEMBER_ID.code) - else - doSyncGroup(group, generation, memberId, groupAssignment, responseCallback) + groupManager.getGroup(groupId) match { + case None => responseCallback(Array.empty, Errors.UNKNOWN_MEMBER_ID.code) + case Some(group) => doSyncGroup(group, generation, memberId, groupAssignment, responseCallback) + } } } @@ -255,7 +248,7 @@ class GroupCoordinator(val brokerId: Int, responseCallback(Array.empty, Errors.ILLEGAL_GENERATION.code) } else { group.currentState match { - case Dead => + case Empty | Dead => responseCallback(Array.empty, Errors.UNKNOWN_MEMBER_ID.code) case PreparingRebalance => @@ -301,7 +294,8 @@ class GroupCoordinator(val brokerId: Int, } // store the group metadata without holding the group lock to avoid the potential - // for deadlock when the callback is invoked + // for deadlock if the callback is invoked holding other locks (e.g. the replica + // state change lock) delayedGroupStore.foreach(groupManager.store) } @@ -313,26 +307,25 @@ class GroupCoordinator(val brokerId: Int, } else if (isCoordinatorLoadingInProgress(groupId)) { responseCallback(Errors.GROUP_LOAD_IN_PROGRESS.code) } else { - val group = groupManager.getGroup(groupId) - if (group == null) { - // if the group is marked as dead, it means some other thread has just removed the group - // from the coordinator metadata; this is likely that the group has migrated to some other - // coordinator OR the group is in a transient unstable phase. Let the consumer to retry - // joining without specified consumer id, - responseCallback(Errors.UNKNOWN_MEMBER_ID.code) - } else { - group synchronized { - if (group.is(Dead)) { - responseCallback(Errors.UNKNOWN_MEMBER_ID.code) - } else if (!group.has(consumerId)) { - responseCallback(Errors.UNKNOWN_MEMBER_ID.code) - } else { - val member = group.get(consumerId) - removeHeartbeatForLeavingMember(group, member) - onMemberFailure(group, member) - responseCallback(Errors.NONE.code) + groupManager.getGroup(groupId) match { + case None => + // if the group is marked as dead, it means some other thread has just removed the group + // from the coordinator metadata; this is likely that the group has migrated to some other + // coordinator OR the group is in a transient unstable phase. Let the consumer to retry + // joining without specified consumer id, + responseCallback(Errors.UNKNOWN_MEMBER_ID.code) + + case Some(group) => + group synchronized { + if (group.is(Dead) || !group.has(consumerId)) { + responseCallback(Errors.UNKNOWN_MEMBER_ID.code) + } else { + val member = group.get(consumerId) + removeHeartbeatForLeavingMember(group, member) + onMemberFailure(group, member) + responseCallback(Errors.NONE.code) + } } - } } } } @@ -349,29 +342,30 @@ class GroupCoordinator(val brokerId: Int, // the group is still loading, so respond just blindly responseCallback(Errors.NONE.code) } else { - val group = groupManager.getGroup(groupId) - if (group == null) { - responseCallback(Errors.UNKNOWN_MEMBER_ID.code) - } else { - group synchronized { - if (group.is(Dead)) { - // if the group is marked as dead, it means some other thread has just removed the group - // from the coordinator metadata; this is likely that the group has migrated to some other - // coordinator OR the group is in a transient unstable phase. Let the member retry - // joining without the specified member id, - responseCallback(Errors.UNKNOWN_MEMBER_ID.code) - } else if (!group.is(Stable)) { - responseCallback(Errors.REBALANCE_IN_PROGRESS.code) - } else if (!group.has(memberId)) { - responseCallback(Errors.UNKNOWN_MEMBER_ID.code) - } else if (generationId != group.generationId) { - responseCallback(Errors.ILLEGAL_GENERATION.code) - } else { - val member = group.get(memberId) - completeAndScheduleNextHeartbeatExpiration(group, member) - responseCallback(Errors.NONE.code) + groupManager.getGroup(groupId) match { + case None => responseCallback(Errors.UNKNOWN_MEMBER_ID.code) + case Some(group) => + group synchronized { + if (group.is(Empty)) { + responseCallback(Errors.UNKNOWN_MEMBER_ID.code) + } else if (group.is(Dead)) { + // if the group is marked as dead, it means some other thread has just removed the group + // from the coordinator metadata; this is likely that the group has migrated to some other + // coordinator OR the group is in a transient unstable phase. Let the member retry + // joining without the specified member id, + responseCallback(Errors.UNKNOWN_MEMBER_ID.code) + } else if (!group.is(Stable)) { + responseCallback(Errors.REBALANCE_IN_PROGRESS.code) + } else if (!group.has(memberId)) { + responseCallback(Errors.UNKNOWN_MEMBER_ID.code) + } else if (generationId != group.generationId) { + responseCallback(Errors.ILLEGAL_GENERATION.code) + } else { + val member = group.get(memberId) + completeAndScheduleNextHeartbeatExpiration(group, member) + responseCallback(Errors.NONE.code) + } } - } } } } @@ -381,8 +375,6 @@ class GroupCoordinator(val brokerId: Int, generationId: Int, offsetMetadata: immutable.Map[TopicPartition, OffsetAndMetadata], responseCallback: immutable.Map[TopicPartition, Short] => Unit) { - var delayedOffsetStore: Option[DelayedStore] = None - if (!isActive.get) { responseCallback(offsetMetadata.mapValues(_ => Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code)) } else if (!isCoordinatorForGroup(groupId)) { @@ -390,33 +382,48 @@ class GroupCoordinator(val brokerId: Int, } else if (isCoordinatorLoadingInProgress(groupId)) { responseCallback(offsetMetadata.mapValues(_ => Errors.GROUP_LOAD_IN_PROGRESS.code)) } else { - val group = groupManager.getGroup(groupId) - if (group == null) { - if (generationId < 0) - // the group is not relying on Kafka for partition management, so allow the commit - delayedOffsetStore = Some(groupManager.prepareStoreOffsets(groupId, memberId, generationId, offsetMetadata, - responseCallback)) - else - // the group has failed over to this coordinator (which will be handled in KAFKA-2017), - // or this is a request coming from an older generation. either way, reject the commit - responseCallback(offsetMetadata.mapValues(_ => Errors.ILLEGAL_GENERATION.code)) - } else { - group synchronized { - if (group.is(Dead)) { - responseCallback(offsetMetadata.mapValues(_ => Errors.UNKNOWN_MEMBER_ID.code)) - } else if (group.is(AwaitingSync)) { - responseCallback(offsetMetadata.mapValues(_ => Errors.REBALANCE_IN_PROGRESS.code)) - } else if (!group.has(memberId)) { - responseCallback(offsetMetadata.mapValues(_ => Errors.UNKNOWN_MEMBER_ID.code)) - } else if (generationId != group.generationId) { - responseCallback(offsetMetadata.mapValues(_ => Errors.ILLEGAL_GENERATION.code)) + groupManager.getGroup(groupId) match { + case None => + if (generationId < 0) { + // the group is not relying on Kafka for group management, so allow the commit + val group = groupManager.addGroup(new GroupMetadata(groupId)) + doCommitOffsets(group, memberId, generationId, offsetMetadata, responseCallback) } else { - val member = group.get(memberId) - completeAndScheduleNextHeartbeatExpiration(group, member) - delayedOffsetStore = Some(groupManager.prepareStoreOffsets(groupId, memberId, generationId, - offsetMetadata, responseCallback)) + // or this is a request coming from an older generation. either way, reject the commit + responseCallback(offsetMetadata.mapValues(_ => Errors.ILLEGAL_GENERATION.code)) } - } + + case Some(group) => + doCommitOffsets(group, memberId, generationId, offsetMetadata, responseCallback) + } + } + } + + def doCommitOffsets(group: GroupMetadata, + memberId: String, + generationId: Int, + offsetMetadata: immutable.Map[TopicPartition, OffsetAndMetadata], + responseCallback: immutable.Map[TopicPartition, Short] => Unit) { + var delayedOffsetStore: Option[DelayedStore] = None + + group synchronized { + if (group.is(Dead)) { + responseCallback(offsetMetadata.mapValues(_ => Errors.UNKNOWN_MEMBER_ID.code)) + } else if (generationId < 0 && group.is(Empty)) { + // the group is only using Kafka to store offsets + delayedOffsetStore = Some(groupManager.prepareStoreOffsets(group, memberId, generationId, + offsetMetadata, responseCallback)) + } else if (group.is(AwaitingSync)) { + responseCallback(offsetMetadata.mapValues(_ => Errors.REBALANCE_IN_PROGRESS.code)) + } else if (!group.has(memberId)) { + responseCallback(offsetMetadata.mapValues(_ => Errors.UNKNOWN_MEMBER_ID.code)) + } else if (generationId != group.generationId) { + responseCallback(offsetMetadata.mapValues(_ => Errors.ILLEGAL_GENERATION.code)) + } else { + val member = group.get(memberId) + completeAndScheduleNextHeartbeatExpiration(group, member) + delayedOffsetStore = Some(groupManager.prepareStoreOffsets(group, memberId, generationId, + offsetMetadata, responseCallback)) } } @@ -424,12 +431,14 @@ class GroupCoordinator(val brokerId: Int, delayedOffsetStore.foreach(groupManager.store) } + def handleFetchOffsets(groupId: String, partitions: Seq[TopicPartition]): Map[TopicPartition, OffsetFetchResponse.PartitionData] = { if (!isActive.get) { partitions.map { case topicPartition => (topicPartition, new OffsetFetchResponse.PartitionData(OffsetFetchResponse.INVALID_OFFSET, "", Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code))}.toMap } else if (!isCoordinatorForGroup(groupId)) { + debug("Could not fetch offsets for group %s (not group coordinator).".format(groupId)) partitions.map { case topicPartition => (topicPartition, new OffsetFetchResponse.PartitionData(OffsetFetchResponse.INVALID_OFFSET, "", Errors.NOT_COORDINATOR_FOR_GROUP.code))}.toMap } else if (isCoordinatorLoadingInProgress(groupId)) { @@ -459,13 +468,12 @@ class GroupCoordinator(val brokerId: Int, } else if (isCoordinatorLoadingInProgress(groupId)) { (Errors.GROUP_LOAD_IN_PROGRESS, GroupCoordinator.EmptyGroup) } else { - val group = groupManager.getGroup(groupId) - if (group == null) { - (Errors.NONE, GroupCoordinator.DeadGroup) - } else { - group synchronized { - (Errors.NONE, group.summary) - } + groupManager.getGroup(groupId) match { + case None => (Errors.NONE, GroupCoordinator.DeadGroup) + case Some(group) => + group synchronized { + (Errors.NONE, group.summary) + } } } } @@ -477,7 +485,7 @@ class GroupCoordinator(val brokerId: Int, group.transitionTo(Dead) previousState match { - case Dead => + case Empty | Dead => case PreparingRebalance => for (member <- group.allMemberMetadata) { if (member.awaitingJoinCallback != null) { @@ -502,7 +510,7 @@ class GroupCoordinator(val brokerId: Int, private def onGroupLoaded(group: GroupMetadata) { group synchronized { info(s"Loading group metadata for ${group.groupId} with generation ${group.generationId}") - assert(group.is(Stable)) + assert(group.is(Stable) || group.is(Empty)) group.allMemberMetadata.foreach(completeAndScheduleNextHeartbeatExpiration(group, _)) } } @@ -580,12 +588,13 @@ class GroupCoordinator(val brokerId: Int, private def addMemberAndRebalance(sessionTimeoutMs: Int, clientId: String, clientHost: String, + protocolType: String, protocols: List[(String, Array[Byte])], group: GroupMetadata, callback: JoinCallback) = { // use the client-id with a random id suffix as the member-id val memberId = clientId + "-" + group.generateMemberIdSuffix - val member = new MemberMetadata(memberId, group.groupId, clientId, clientHost, sessionTimeoutMs, protocols) + val member = new MemberMetadata(memberId, group.groupId, clientId, clientHost, sessionTimeoutMs, protocolType, protocols) member.awaitingJoinCallback = callback group.add(member.memberId, member) maybePrepareRebalance(group) @@ -626,7 +635,7 @@ class GroupCoordinator(val brokerId: Int, trace("Member %s in group %s has failed".format(member.memberId, group.groupId)) group.remove(member.memberId) group.currentState match { - case Dead => + case Dead | Empty => case Stable | AwaitingSync => maybePrepareRebalance(group) case PreparingRebalance => joinPurgatory.checkAndComplete(GroupKey(group.groupId)) } @@ -645,42 +654,49 @@ class GroupCoordinator(val brokerId: Int, } def onCompleteJoin(group: GroupMetadata) { + var delayedStore: Option[DelayedStore] = None group synchronized { - val failedMembers = group.notYetRejoinedMembers - if (group.isEmpty || failedMembers.nonEmpty) { - failedMembers.foreach { failedMember => - group.remove(failedMember.memberId) - // TODO: cut the socket connection to the client - } - - // TODO KAFKA-2720: only remove group in the background thread - if (group.isEmpty) { - group.transitionTo(Dead) - groupManager.removeGroup(group) - info("Group %s generation %s is dead and removed".format(group.groupId, group.generationId)) - } + // remove any members who haven't joined the group yet + group.notYetRejoinedMembers.foreach { failedMember => + group.remove(failedMember.memberId) + // TODO: cut the socket connection to the client } + if (!group.is(Dead)) { group.initNextGeneration() - info("Stabilized group %s generation %s".format(group.groupId, group.generationId)) - - // trigger the awaiting join group response callback for all the members after rebalancing - for (member <- group.allMemberMetadata) { - assert(member.awaitingJoinCallback != null) - val joinResult = JoinGroupResult( - members=if (member.memberId == group.leaderId) { group.currentMemberMetadata } else { Map.empty }, - memberId=member.memberId, - generationId=group.generationId, - subProtocol=group.protocol, - leaderId=group.leaderId, - errorCode=Errors.NONE.code) - - member.awaitingJoinCallback(joinResult) - member.awaitingJoinCallback = null - completeAndScheduleNextHeartbeatExpiration(group, member) + if (group.is(Empty)) { + info(s"Group ${group.groupId} with generation ${group.generationId} is now empty") + delayedStore = Some(groupManager.prepareStoreGroup(group, Map.empty, errorCode => { + if (errorCode != Errors.NONE.code) { + // we failed to persist the empty group. if we don't retry (which is how + // we handle the situation when a normal rebalance fails, then a coordinator + // change will cause the old generation to come back to life. + } + })) + } else { + info(s"Stabilized group ${group.groupId} generation ${group.generationId}") + + // trigger the awaiting join group response callback for all the members after rebalancing + for (member <- group.allMemberMetadata) { + assert(member.awaitingJoinCallback != null) + val joinResult = JoinGroupResult( + members=if (member.memberId == group.leaderId) { group.currentMemberMetadata } else { Map.empty }, + memberId=member.memberId, + generationId=group.generationId, + subProtocol=group.protocol, + leaderId=group.leaderId, + errorCode=Errors.NONE.code) + + member.awaitingJoinCallback(joinResult) + member.awaitingJoinCallback = null + completeAndScheduleNextHeartbeatExpiration(group, member) + } } } } + + // call without holding the group lock + delayedStore.foreach(groupManager.store) } def tryCompleteHeartbeat(group: GroupMetadata, member: MemberMetadata, heartbeatDeadline: Long, forceComplete: () => Boolean) = { @@ -757,3 +773,13 @@ object GroupCoordinator { } } + +case class GroupConfig(groupMinSessionTimeoutMs: Int, + groupMaxSessionTimeoutMs: Int) + +case class JoinGroupResult(members: Map[String, Array[Byte]], + memberId: String, + generationId: Int, + subProtocol: String, + leaderId: String, + errorCode: Short) http://git-wip-us.apache.org/repos/asf/kafka/blob/8c551675/core/src/main/scala/kafka/coordinator/GroupMetadata.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/coordinator/GroupMetadata.scala b/core/src/main/scala/kafka/coordinator/GroupMetadata.scala index 4fa656e..b455964 100644 --- a/core/src/main/scala/kafka/coordinator/GroupMetadata.scala +++ b/core/src/main/scala/kafka/coordinator/GroupMetadata.scala @@ -18,10 +18,10 @@ package kafka.coordinator import kafka.utils.nonthreadsafe - import java.util.UUID -import org.apache.kafka.common.protocol.Errors +import kafka.common.OffsetAndMetadata +import org.apache.kafka.common.TopicPartition import collection.mutable @@ -37,7 +37,8 @@ private[coordinator] sealed trait GroupState { def state: Byte } * allow offset commits from previous generation * allow offset fetch requests * transition: some members have joined by the timeout => AwaitingSync - * all members have left the group => Dead + * all members have left the group => Empty + * group is removed by partition emigration => Dead */ private[coordinator] case object PreparingRebalance extends GroupState { val state: Byte = 1 } @@ -52,6 +53,7 @@ private[coordinator] case object PreparingRebalance extends GroupState { val sta * join group from new member or existing member with updated metadata => PreparingRebalance * leave group from existing member => PreparingRebalance * member failure detected => PreparingRebalance + * group is removed by partition emigration => Dead */ private[coordinator] case object AwaitingSync extends GroupState { val state: Byte = 5} @@ -67,11 +69,12 @@ private[coordinator] case object AwaitingSync extends GroupState { val state: By * leave group from existing member => PreparingRebalance * leader join-group received => PreparingRebalance * follower join-group with new metadata => PreparingRebalance + * group is removed by partition emigration => Dead */ private[coordinator] case object Stable extends GroupState { val state: Byte = 3 } /** - * Group has no more members + * Group has no more members and its metadata is being removed * * action: respond to join group with UNKNOWN_MEMBER_ID * respond to sync group with UNKNOWN_MEMBER_ID @@ -83,13 +86,31 @@ private[coordinator] case object Stable extends GroupState { val state: Byte = 3 */ private[coordinator] case object Dead extends GroupState { val state: Byte = 4 } +/** + * Group has no more members, but lingers until all offsets have expired. This state + * also represents groups which use Kafka only for offset commits and have no members. + * + * action: respond normally to join group from new members + * respond to sync group with UNKNOWN_MEMBER_ID + * respond to heartbeat with UNKNOWN_MEMBER_ID + * respond to leave group with UNKNOWN_MEMBER_ID + * respond to offset commit with UNKNOWN_MEMBER_ID + * allow offset fetch requests + * transition: last offsets removed in periodic expiration task => Dead + * join group from a new member => PreparingRebalance + * group is removed by partition emigration => Dead + * group is removed by expiration => Dead + */ +private[coordinator] case object Empty extends GroupState { val state: Byte = 5 } + private object GroupMetadata { private val validPreviousStates: Map[GroupState, Set[GroupState]] = - Map(Dead -> Set(Stable, PreparingRebalance, AwaitingSync), + Map(Dead -> Set(Stable, PreparingRebalance, AwaitingSync, Empty, Dead), AwaitingSync -> Set(PreparingRebalance), Stable -> Set(AwaitingSync), - PreparingRebalance -> Set(Stable, AwaitingSync)) + PreparingRebalance -> Set(Stable, AwaitingSync, Empty), + Empty -> Set(PreparingRebalance)) } /** @@ -120,10 +141,14 @@ case class GroupSummary(state: String, * 3. leader id */ @nonthreadsafe -private[coordinator] class GroupMetadata(val groupId: String, val protocolType: String) { +private[coordinator] class GroupMetadata(val groupId: String, initialState: GroupState = Empty) { + private var state: GroupState = initialState private val members = new mutable.HashMap[String, MemberMetadata] - private var state: GroupState = Stable + private val offsets = new mutable.HashMap[TopicPartition, OffsetAndMetadata] + private val pendingOffsetCommits = new mutable.HashMap[TopicPartition, OffsetAndMetadata] + + var protocolType: Option[String] = None var generationId = 0 var leaderId: String = null var protocol: String = null @@ -134,6 +159,11 @@ private[coordinator] class GroupMetadata(val groupId: String, val protocolType: def get(memberId: String) = members(memberId) def add(memberId: String, member: MemberMetadata) { + if (members.isEmpty) + this.protocolType = Some(member.protocolType) + + assert(groupId == member.groupId) + assert(this.protocolType.orNull == member.protocolType) assert(supportsProtocols(member.protocols)) if (leaderId == null) @@ -154,8 +184,6 @@ private[coordinator] class GroupMetadata(val groupId: String, val protocolType: def currentState = state - def isEmpty = members.isEmpty - def notYetRejoinedMembers = members.values.filter(_.awaitingJoinCallback == null).toList def allMembers = members.keySet @@ -169,7 +197,7 @@ private[coordinator] class GroupMetadata(val groupId: String, val protocolType: // TODO: decide if ids should be predictable or random def generateMemberIdSuffix = UUID.randomUUID().toString - def canRebalance = state == Stable || state == AwaitingSync + def canRebalance = GroupMetadata.validPreviousStates(PreparingRebalance).contains(state) def transitionTo(groupState: GroupState) { assertValidTransition(groupState) @@ -201,14 +229,20 @@ private[coordinator] class GroupMetadata(val groupId: String, val protocolType: } def supportsProtocols(memberProtocols: Set[String]) = { - isEmpty || (memberProtocols & candidateProtocols).nonEmpty + members.isEmpty || (memberProtocols & candidateProtocols).nonEmpty } def initNextGeneration() = { assert(notYetRejoinedMembers == List.empty[MemberMetadata]) - generationId += 1 - protocol = selectProtocol - transitionTo(AwaitingSync) + if (members.nonEmpty) { + generationId += 1 + protocol = selectProtocol + transitionTo(AwaitingSync) + } else { + generationId += 1 + protocol = null + transitionTo(Empty) + } } def currentMemberMetadata: Map[String, Array[Byte]] = { @@ -219,18 +253,53 @@ private[coordinator] class GroupMetadata(val groupId: String, val protocolType: def summary: GroupSummary = { if (is(Stable)) { - val members = this.members.values.map{ member => member.summary(protocol) }.toList - GroupSummary(state.toString, protocolType, protocol, members) + val members = this.members.values.map { member => member.summary(protocol) }.toList + GroupSummary(state.toString, protocolType.getOrElse(""), protocol, members) } else { val members = this.members.values.map{ member => member.summaryNoMetadata() }.toList - GroupSummary(state.toString, protocolType, GroupCoordinator.NoProtocol, members) + GroupSummary(state.toString, protocolType.getOrElse(""), GroupCoordinator.NoProtocol, members) } } def overview: GroupOverview = { - GroupOverview(groupId, protocolType) + GroupOverview(groupId, protocolType.getOrElse("")) + } + + def completePendingOffsetWrite(topicPartition: TopicPartition, offset: OffsetAndMetadata) { + offsets.put(topicPartition, offset) + pendingOffsetCommits.get(topicPartition) match { + case Some(stagedOffset) if offset == stagedOffset => pendingOffsetCommits.remove(topicPartition) + case _ => + } + } + + def failPendingOffsetWrite(topicPartition: TopicPartition, offset: OffsetAndMetadata): Unit = { + pendingOffsetCommits.get(topicPartition) match { + case Some(pendingOffset) if offset == pendingOffset => pendingOffsetCommits.remove(topicPartition) + case _ => + } + } + + def prepareOffsetCommit(offsets: Map[TopicPartition, OffsetAndMetadata]) { + pendingOffsetCommits ++= offsets + } + + def removeExpiredOffsets(startMs: Long) = { + val expiredOffsets = offsets.filter { + case (topicPartition, offset) => offset.expireTimestamp < startMs && !pendingOffsetCommits.contains(topicPartition) + } + offsets --= expiredOffsets.keySet + expiredOffsets } + def allOffsets = offsets.toMap + + def offset(topicPartition: TopicPartition) = offsets.get(topicPartition) + + def numOffsets = offsets.size + + def hasOffsets = offsets.nonEmpty || pendingOffsetCommits.nonEmpty + private def assertValidTransition(targetState: GroupState) { if (!GroupMetadata.validPreviousStates(targetState).contains(state)) throw new IllegalStateException("Group %s should be in the %s states before moving to %s state. Instead it is in %s state" @@ -240,4 +309,5 @@ private[coordinator] class GroupMetadata(val groupId: String, val protocolType: override def toString = { "[%s,%s,%s,%s]".format(groupId, protocolType, currentState.toString, members) } -} \ No newline at end of file +} + http://git-wip-us.apache.org/repos/asf/kafka/blob/8c551675/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala b/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala index b968f97..915c360 100644 --- a/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala +++ b/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala @@ -17,12 +17,11 @@ package kafka.coordinator -import java.util.concurrent.locks.ReentrantReadWriteLock -import kafka.utils.CoreUtils._ import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.protocol.types.{ArrayOf, Field, Schema, Struct} import org.apache.kafka.common.protocol.types.Type.STRING +import org.apache.kafka.common.protocol.types.Type.NULLABLE_STRING import org.apache.kafka.common.protocol.types.Type.INT32 import org.apache.kafka.common.protocol.types.Type.INT64 import org.apache.kafka.common.protocol.types.Type.BYTES @@ -39,16 +38,18 @@ import kafka.metrics.KafkaMetricsGroup import kafka.common.TopicAndPartition import kafka.common.MessageFormatter import kafka.server.ReplicaManager + import scala.collection._ import java.io.PrintStream import java.nio.ByteBuffer import java.util.concurrent.atomic.AtomicBoolean import java.util.concurrent.TimeUnit +import java.util.concurrent.locks.ReentrantLock + import com.yammer.metrics.core.Gauge +import kafka.utils.CoreUtils.inLock import org.apache.kafka.common.internals.TopicConstants -case class DelayedStore(messageSet: Map[TopicPartition, MessageSet], - callback: Map[TopicPartition, PartitionResponse] => Unit) class GroupMetadataManager(val brokerId: Int, val config: OffsetConfig, @@ -56,72 +57,77 @@ class GroupMetadataManager(val brokerId: Int, zkUtils: ZkUtils, time: Time) extends Logging with KafkaMetricsGroup { - /* offsets cache */ - private val offsetsCache = new Pool[GroupTopicPartition, OffsetAndMetadata] + private val groupMetadataCache = new Pool[String, GroupMetadata] - /* group metadata cache */ - private val groupsCache = new Pool[String, GroupMetadata] + /* lock protecting access to loading and owned partition sets */ + private val partitionLock = new ReentrantLock() - /* partitions of consumer groups that are being loaded, its lock should be always called BEFORE offsetExpireLock and the group lock if needed */ + /* partitions of consumer groups that are being loaded, its lock should be always called BEFORE the group lock if needed */ private val loadingPartitions: mutable.Set[Int] = mutable.Set() /* partitions of consumer groups that are assigned, using the same loading partition lock */ private val ownedPartitions: mutable.Set[Int] = mutable.Set() - /* lock for expiring stale offsets, it should be always called BEFORE the group lock if needed */ - private val offsetExpireLock = new ReentrantReadWriteLock() - /* shutting down flag */ private val shuttingDown = new AtomicBoolean(false) /* number of partitions for the consumer metadata topic */ private val groupMetadataTopicPartitionCount = getOffsetsTopicPartitionCount - /* Single-thread scheduler to handling offset/group metadata cache loading and unloading */ + /* single-thread scheduler to handle offset/group metadata cache loading and unloading */ private val scheduler = new KafkaScheduler(threads = 1, threadNamePrefix = "group-metadata-manager-") this.logIdent = "[Group Metadata Manager on Broker " + brokerId + "]: " - scheduler.startup() - scheduler.schedule(name = "delete-expired-consumer-offsets", - fun = deleteExpiredOffsets, - period = config.offsetsRetentionCheckIntervalMs, - unit = TimeUnit.MILLISECONDS) - newGauge("NumOffsets", new Gauge[Int] { - def value = offsetsCache.size + def value = groupMetadataCache.values.map(group => { + group synchronized { group.numOffsets } + }).sum } ) newGauge("NumGroups", new Gauge[Int] { - def value = groupsCache.size + def value = groupMetadataCache.size } ) - def currentGroups(): Iterable[GroupMetadata] = groupsCache.values + def enableMetadataExpiration() { + scheduler.startup() + + scheduler.schedule(name = "delete-expired-group-metadata", + fun = cleanupGroupMetadata, + period = config.offsetsRetentionCheckIntervalMs, + unit = TimeUnit.MILLISECONDS) + } + + def currentGroups(): Iterable[GroupMetadata] = groupMetadataCache.values + + def isPartitionOwned(partition: Int) = inLock(partitionLock) { ownedPartitions.contains(partition) } + + def isPartitionLoading(partition: Int) = inLock(partitionLock) { loadingPartitions.contains(partition) } def partitionFor(groupId: String): Int = Utils.abs(groupId.hashCode) % groupMetadataTopicPartitionCount - def isGroupLocal(groupId: String): Boolean = loadingPartitions synchronized ownedPartitions.contains(partitionFor(groupId)) + def isGroupLocal(groupId: String): Boolean = isPartitionOwned(partitionFor(groupId)) - def isGroupLoading(groupId: String): Boolean = loadingPartitions synchronized loadingPartitions.contains(partitionFor(groupId)) + def isGroupLoading(groupId: String): Boolean = isPartitionLoading(partitionFor(groupId)) - def isLoading(): Boolean = loadingPartitions synchronized loadingPartitions.nonEmpty + def isLoading(): Boolean = inLock(partitionLock) { loadingPartitions.nonEmpty } /** * Get the group associated with the given groupId, or null if not found */ - def getGroup(groupId: String): GroupMetadata = { - groupsCache.get(groupId) + def getGroup(groupId: String): Option[GroupMetadata] = { + Option(groupMetadataCache.get(groupId)) } /** * Add a group or get the group associated with the given groupId if it already exists */ def addGroup(group: GroupMetadata): GroupMetadata = { - val currentGroup = groupsCache.putIfNotExists(group.groupId, group) + val currentGroup = groupMetadataCache.putIfNotExists(group.groupId, group) if (currentGroup != null) { currentGroup } else { @@ -130,13 +136,15 @@ class GroupMetadataManager(val brokerId: Int, } /** - * Remove all metadata associated with the group - * @param group + * Remove the group from the cache and delete all metadata associated with it. This should be + * called only after all offsets for the group have expired and no members are remaining (i.e. + * it is in the Empty state). */ - def removeGroup(group: GroupMetadata) { + private def evictGroupAndDeleteMetadata(group: GroupMetadata) { // guard this removal in case of concurrent access (e.g. if a delayed join completes with no members - // while the group is being removed due to coordinator emigration) - if (groupsCache.remove(group.groupId, group)) { + // while the group is being removed due to coordinator emigration). We also avoid writing the tombstone + // when the generationId is 0, since this group is only using Kafka for offset storage. + if (groupMetadataCache.remove(group.groupId, group) && group.generationId > 0) { // Append the tombstone messages to the partition. It is okay if the replicas don't receive these (say, // if we crash or leaders move) since the new leaders will still expire the consumers with heartbeat and // retry removing this group. @@ -154,7 +162,6 @@ class GroupMetadataManager(val brokerId: Int, try { // do not need to require acks since even if the tombstone is lost, // it will be appended again by the new leader - // TODO KAFKA-2720: periodic purging instead of immediate removal of groups partition.appendMessagesToLeader(new ByteBufferMessageSet(config.offsetsTopicCompressionCodec, tombstone)) } catch { case t: Throwable => @@ -227,20 +234,20 @@ class GroupMetadataManager(val brokerId: Int, DelayedStore(groupMetadataMessageSet, putCacheCallback) } - def store(delayedAppend: DelayedStore) { + def store(delayedStore: DelayedStore) { // call replica manager to append the group message replicaManager.appendMessages( config.offsetCommitTimeoutMs.toLong, config.offsetCommitRequiredAcks, true, // allow appending to internal offset topic - delayedAppend.messageSet, - delayedAppend.callback) + delayedStore.messageSet, + delayedStore.callback) } /** * Store offsets by appending it to the replicated log and then inserting to cache */ - def prepareStoreOffsets(groupId: String, + def prepareStoreOffsets(group: GroupMetadata, consumerId: String, generationId: Int, offsetMetadata: immutable.Map[TopicPartition, OffsetAndMetadata], @@ -252,16 +259,16 @@ class GroupMetadataManager(val brokerId: Int, // construct the message set to append val messages = filteredOffsetMetadata.map { case (topicAndPartition, offsetAndMetadata) => - val (magicValue, timestamp) = getMessageFormatVersionAndTimestamp(partitionFor(groupId)) + val (magicValue, timestamp) = getMessageFormatVersionAndTimestamp(partitionFor(group.groupId)) new Message( - key = GroupMetadataManager.offsetCommitKey(groupId, topicAndPartition.topic, topicAndPartition.partition), + key = GroupMetadataManager.offsetCommitKey(group.groupId, topicAndPartition.topic, topicAndPartition.partition), bytes = GroupMetadataManager.offsetCommitValue(offsetAndMetadata), timestamp = timestamp, magicValue = magicValue ) }.toSeq - val offsetTopicPartition = new TopicPartition(TopicConstants.GROUP_METADATA_TOPIC_NAME, partitionFor(groupId)) + val offsetTopicPartition = new TopicPartition(TopicConstants.GROUP_METADATA_TOPIC_NAME, partitionFor(group.groupId)) val offsetsAndMetadataMessageSet = Map(offsetTopicPartition -> new ByteBufferMessageSet(config.offsetsTopicCompressionCodec, messages:_*)) @@ -278,29 +285,38 @@ class GroupMetadataManager(val brokerId: Int, val status = responseStatus(offsetTopicPartition) val responseCode = - if (status.errorCode == Errors.NONE.code) { - filteredOffsetMetadata.foreach { case (topicAndPartition, offsetAndMetadata) => - putOffset(GroupTopicPartition(groupId, topicAndPartition), offsetAndMetadata) + group synchronized { + if (status.errorCode == Errors.NONE.code) { + if (!group.is(Dead)) { + filteredOffsetMetadata.foreach { case (topicAndPartition, offsetAndMetadata) => + group.completePendingOffsetWrite(topicAndPartition, offsetAndMetadata) + } + } + Errors.NONE.code + } else { + if (!group.is(Dead)) { + filteredOffsetMetadata.foreach { case (topicAndPartition, offsetAndMetadata) => + group.failPendingOffsetWrite(topicAndPartition, offsetAndMetadata) + } + } + + debug("Offset commit %s from group %s consumer %s with generation %d failed when appending to log due to %s" + .format(filteredOffsetMetadata, group.groupId, consumerId, generationId, Errors.forCode(status.errorCode).exceptionName)) + + // transform the log append error code to the corresponding the commit status error code + if (status.errorCode == Errors.UNKNOWN_TOPIC_OR_PARTITION.code) + Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code + else if (status.errorCode == Errors.NOT_LEADER_FOR_PARTITION.code) + Errors.NOT_COORDINATOR_FOR_GROUP.code + else if (status.errorCode == Errors.MESSAGE_TOO_LARGE.code + || status.errorCode == Errors.RECORD_LIST_TOO_LARGE.code + || status.errorCode == Errors.INVALID_FETCH_SIZE.code) + Errors.INVALID_COMMIT_OFFSET_SIZE.code + else + status.errorCode } - Errors.NONE.code - } else { - debug("Offset commit %s from group %s consumer %s with generation %d failed when appending to log due to %s" - .format(filteredOffsetMetadata, groupId, consumerId, generationId, Errors.forCode(status.errorCode).exceptionName)) - - // transform the log append error code to the corresponding the commit status error code - if (status.errorCode == Errors.UNKNOWN_TOPIC_OR_PARTITION.code) - Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code - else if (status.errorCode == Errors.NOT_LEADER_FOR_PARTITION.code) - Errors.NOT_COORDINATOR_FOR_GROUP.code - else if (status.errorCode == Errors.MESSAGE_TOO_LARGE.code - || status.errorCode == Errors.RECORD_LIST_TOO_LARGE.code - || status.errorCode == Errors.INVALID_FETCH_SIZE.code) - Errors.INVALID_COMMIT_OFFSET_SIZE.code - else - status.errorCode } - // compute the final error codes for the commit response val commitStatus = offsetMetadata.map { case (topicAndPartition, offsetAndMetadata) => if (validateOffsetMetadataLength(offsetAndMetadata.metadata)) @@ -313,6 +329,10 @@ class GroupMetadataManager(val brokerId: Int, responseCallback(commitStatus) } + group synchronized { + group.prepareOffsetCommit(offsetMetadata) + } + DelayedStore(offsetsAndMetadataMessageSet, putCacheCallback) } @@ -320,27 +340,36 @@ class GroupMetadataManager(val brokerId: Int, * The most important guarantee that this API provides is that it should never return a stale offset. i.e., it either * returns the current offset or it begins to sync the cache from the log (and returns an error code). */ - def getOffsets(group: String, topicPartitions: Seq[TopicPartition]): Map[TopicPartition, OffsetFetchResponse.PartitionData] = { - trace("Getting offsets %s for group %s.".format(topicPartitions, group)) - - if (isGroupLocal(group)) { - if (topicPartitions.isEmpty) { - // Return offsets for all partitions owned by this consumer group. (this only applies to consumers that commit offsets to Kafka.) - offsetsCache.filter(_._1.group == group).map { case(groupTopicPartition, offsetAndMetadata) => - (groupTopicPartition.topicPartition, new OffsetFetchResponse.PartitionData(offsetAndMetadata.offset, offsetAndMetadata.metadata, Errors.NONE.code)) - }.toMap - } else { - topicPartitions.map { topicPartition => - val groupTopicPartition = GroupTopicPartition(group, topicPartition) - (groupTopicPartition.topicPartition, getOffset(groupTopicPartition)) - }.toMap - } - } else { - debug("Could not fetch offsets for group %s (not offset coordinator).".format(group)) + def getOffsets(groupId: String, topicPartitions: Seq[TopicPartition]): Map[TopicPartition, OffsetFetchResponse.PartitionData] = { + trace("Getting offsets %s for group %s.".format(topicPartitions, groupId)) + val group = groupMetadataCache.get(groupId) + if (group == null) { topicPartitions.map { topicPartition => - val groupTopicPartition = GroupTopicPartition(group, topicPartition) - (groupTopicPartition.topicPartition, new OffsetFetchResponse.PartitionData(OffsetFetchResponse.INVALID_OFFSET, "", Errors.NOT_COORDINATOR_FOR_GROUP.code)) + (topicPartition, new OffsetFetchResponse.PartitionData(OffsetFetchResponse.INVALID_OFFSET, "", Errors.NONE.code)) }.toMap + } else { + group synchronized { + if (group.is(Dead)) { + topicPartitions.map { topicPartition => + (topicPartition, new OffsetFetchResponse.PartitionData(OffsetFetchResponse.INVALID_OFFSET, "", Errors.NONE.code)) + }.toMap + } else { + if (topicPartitions.isEmpty) { + // Return offsets for all partitions owned by this consumer group. (this only applies to consumers that commit offsets to Kafka.) + group.allOffsets.map { case (topicPartition, offsetAndMetadata) => + (topicPartition, new OffsetFetchResponse.PartitionData(offsetAndMetadata.offset, offsetAndMetadata.metadata, Errors.NONE.code)) + } + } else { + topicPartitions.map { topicPartition => + group.offset(topicPartition) match { + case None => (topicPartition, new OffsetFetchResponse.PartitionData(OffsetFetchResponse.INVALID_OFFSET, "", Errors.NONE.code)) + case Some(offsetAndMetadata) => + (topicPartition, new OffsetFetchResponse.PartitionData(offsetAndMetadata.offset, offsetAndMetadata.metadata, Errors.NONE.code)) + } + }.toMap + } + } + } } } @@ -355,7 +384,7 @@ class GroupMetadataManager(val brokerId: Int, def loadGroupsAndOffsets() { info("Loading offsets and group metadata from " + topicPartition) - loadingPartitions synchronized { + inLock(partitionLock) { if (loadingPartitions.contains(offsetsPartition)) { info("Offset load from %s already in progress.".format(topicPartition)) return @@ -371,74 +400,70 @@ class GroupMetadataManager(val brokerId: Int, var currOffset = log.logSegments.head.baseOffset val buffer = ByteBuffer.allocate(config.loadBufferSize) // loop breaks if leader changes at any time during the load, since getHighWatermark is -1 - inWriteLock(offsetExpireLock) { - val loadedGroups = mutable.Map[String, GroupMetadata]() - val removedGroups = mutable.Set[String]() - - while (currOffset < getHighWatermark(offsetsPartition) && !shuttingDown.get()) { - buffer.clear() - val messages = log.read(currOffset, config.loadBufferSize).messageSet.asInstanceOf[FileMessageSet] - messages.readInto(buffer, 0) - val messageSet = new ByteBufferMessageSet(buffer) - messageSet.foreach { msgAndOffset => - require(msgAndOffset.message.key != null, "Offset entry key should not be null") - val baseKey = GroupMetadataManager.readMessageKey(msgAndOffset.message.key) - - if (baseKey.isInstanceOf[OffsetKey]) { - // load offset - val key = baseKey.key.asInstanceOf[GroupTopicPartition] - if (msgAndOffset.message.payload == null) { - if (offsetsCache.remove(key) != null) - trace("Removed offset for %s due to tombstone entry.".format(key)) - else - trace("Ignoring redundant tombstone for %s.".format(key)) - } else { - // special handling for version 0: - // set the expiration time stamp as commit time stamp + server default retention time - val value = GroupMetadataManager.readOffsetMessageValue(msgAndOffset.message.payload) - putOffset(key, value.copy ( - expireTimestamp = { - if (value.expireTimestamp == org.apache.kafka.common.requests.OffsetCommitRequest.DEFAULT_TIMESTAMP) - value.commitTimestamp + config.offsetsRetentionMs - else - value.expireTimestamp - } - )) - trace("Loaded offset %s for %s.".format(value, key)) - } + val loadedOffsets = mutable.Map[GroupTopicPartition, OffsetAndMetadata]() + val removedOffsets = mutable.Set[GroupTopicPartition]() + val loadedGroups = mutable.Map[String, GroupMetadata]() + val removedGroups = mutable.Set[String]() + + while (currOffset < getHighWatermark(offsetsPartition) && !shuttingDown.get()) { + buffer.clear() + val messages = log.read(currOffset, config.loadBufferSize).messageSet.asInstanceOf[FileMessageSet] + messages.readInto(buffer, 0) + val messageSet = new ByteBufferMessageSet(buffer) + messageSet.foreach { msgAndOffset => + require(msgAndOffset.message.key != null, "Offset entry key should not be null") + val baseKey = GroupMetadataManager.readMessageKey(msgAndOffset.message.key) + + if (baseKey.isInstanceOf[OffsetKey]) { + // load offset + val key = baseKey.key.asInstanceOf[GroupTopicPartition] + if (msgAndOffset.message.payload == null) { + loadedOffsets.remove(key) + removedOffsets.add(key) } else { - // load group metadata - val groupId = baseKey.key.asInstanceOf[String] - val groupMetadata = GroupMetadataManager.readGroupMessageValue(groupId, msgAndOffset.message.payload) - if (groupMetadata != null) { - trace(s"Loaded group metadata for group ${groupMetadata.groupId} with generation ${groupMetadata.generationId}") - removedGroups.remove(groupId) - loadedGroups.put(groupId, groupMetadata) - } else { - loadedGroups.remove(groupId) - removedGroups.add(groupId) - } + val value = GroupMetadataManager.readOffsetMessageValue(msgAndOffset.message.payload) + loadedOffsets.put(key, value) + removedOffsets.remove(key) + } + } else { + // load group metadata + val groupId = baseKey.key.asInstanceOf[String] + val groupMetadata = GroupMetadataManager.readGroupMessageValue(groupId, msgAndOffset.message.payload) + if (groupMetadata != null) { + trace(s"Loaded group metadata for group ${groupMetadata.groupId} with generation ${groupMetadata.generationId}") + removedGroups.remove(groupId) + loadedGroups.put(groupId, groupMetadata) + } else { + loadedGroups.remove(groupId) + removedGroups.add(groupId) } - - currOffset = msgAndOffset.nextOffset } - } - loadedGroups.values.foreach { group => - val currentGroup = addGroup(group) - if (group != currentGroup) - debug(s"Attempt to load group ${group.groupId} from log with generation ${group.generationId} failed " + - s"because there is already a cached group with generation ${currentGroup.generationId}") - else - onGroupLoaded(group) + currOffset = msgAndOffset.nextOffset } + } - removedGroups.foreach { groupId => - val group = groupsCache.get(groupId) - if (group != null) - throw new IllegalStateException(s"Unexpected unload of acitve group ${group.groupId} while " + - s"loading partition ${topicPartition}") - } + val (groupOffsets, noGroupOffsets) = loadedOffsets + .groupBy(_._1.group) + .mapValues(_.map{ case (groupTopicPartition, offsetAndMetadata) => (groupTopicPartition.topicPartition, offsetAndMetadata)}) + .partition(value => loadedGroups.contains(value._1)) + + loadedGroups.values.foreach { group => + val offsets = groupOffsets.getOrElse(group.groupId, Map.empty) + loadGroup(group, offsets) + onGroupLoaded(group) + } + + noGroupOffsets.foreach { case (groupId, offsets) => + val group = new GroupMetadata(groupId) + loadGroup(group, offsets) + onGroupLoaded(group) + } + + removedGroups.foreach { groupId => + if (groupMetadataCache.contains(groupId)) + throw new IllegalStateException(s"Unexpected unload of active group ${groupId} while " + + s"loading partition ${topicPartition}") } if (!shuttingDown.get()) @@ -453,7 +478,7 @@ class GroupMetadataManager(val brokerId: Int, error("Error in loading offsets from " + topicPartition, t) } finally { - loadingPartitions synchronized { + inLock(partitionLock) { ownedPartitions.add(offsetsPartition) loadingPartitions.remove(offsetsPartition) } @@ -461,10 +486,37 @@ class GroupMetadataManager(val brokerId: Int, } } + private def loadGroup(group: GroupMetadata, offsets: Iterable[(TopicPartition, OffsetAndMetadata)]): Unit = { + val currentGroup = addGroup(group) + if (group != currentGroup) { + debug(s"Attempt to load group ${group.groupId} from log with generation ${group.generationId} failed " + + s"because there is already a cached group with generation ${currentGroup.generationId}") + } else { + + offsets.foreach { + case (topicPartition, offsetAndMetadata) => { + val offset = offsetAndMetadata.copy ( + expireTimestamp = { + // special handling for version 0: + // set the expiration time stamp as commit time stamp + server default retention time + if (offsetAndMetadata.expireTimestamp == org.apache.kafka.common.requests.OffsetCommitRequest.DEFAULT_TIMESTAMP) + offsetAndMetadata.commitTimestamp + config.offsetsRetentionMs + else + offsetAndMetadata.expireTimestamp + } + ) + trace("Loaded offset %s for %s.".format(offset, topicPartition)) + group.completePendingOffsetWrite(topicPartition, offset) + } + } + } + } + /** * When this broker becomes a follower for an offsets topic partition clear out the cache for groups that belong to * that partition. - * @param offsetsPartition Groups belonging to this partition of the offsets topic will be deleted from the cache. + * + * @param offsetsPartition Groups belonging to this partition of the offsets topic will be deleted from the cache. */ def removeGroupsForPartition(offsetsPartition: Int, onGroupUnloaded: GroupMetadata => Unit) { @@ -475,31 +527,22 @@ class GroupMetadataManager(val brokerId: Int, var numOffsetsRemoved = 0 var numGroupsRemoved = 0 - loadingPartitions synchronized { + inLock(partitionLock) { // we need to guard the group removal in cache in the loading partition lock // to prevent coordinator's check-and-get-group race condition ownedPartitions.remove(offsetsPartition) - // clear the offsets for this partition in the cache - /** * NOTE: we need to put this in the loading partition lock as well to prevent race condition of the leader-is-local check * in getOffsets to protects against fetching from an empty/cleared offset cache (i.e., cleared due to a leader->follower * transition right after the check and clear the cache), causing offset fetch return empty offsets with NONE error code */ - offsetsCache.keys.foreach { key => - if (partitionFor(key.group) == offsetsPartition) { - offsetsCache.remove(key) - numOffsetsRemoved += 1 - } - } - - // clear the groups for this partition in the cache - for (group <- groupsCache.values) { + for (group <- groupMetadataCache.values) { if (partitionFor(group.groupId) == offsetsPartition) { onGroupUnloaded(group) - groupsCache.remove(group.groupId, group) + groupMetadataCache.remove(group.groupId, group) numGroupsRemoved += 1 + numOffsetsRemoved += group.numOffsets } } } @@ -512,82 +555,53 @@ class GroupMetadataManager(val brokerId: Int, } } - /** - * Fetch the current offset for the given group/topic/partition from the underlying offsets storage. - * - * @param key The requested group-topic-partition - * @return If the key is present, return the offset and metadata; otherwise return None - */ - private def getOffset(key: GroupTopicPartition): OffsetFetchResponse.PartitionData = { - val offsetAndMetadata = offsetsCache.get(key) - if (offsetAndMetadata == null) - new OffsetFetchResponse.PartitionData(OffsetFetchResponse.INVALID_OFFSET, "", Errors.NONE.code) - else - new OffsetFetchResponse.PartitionData(offsetAndMetadata.offset, offsetAndMetadata.metadata, Errors.NONE.code) - } - - /** - * Put the (already committed) offset for the given group/topic/partition into the cache. - * - * @param key The group-topic-partition - * @param offsetAndMetadata The offset/metadata to be stored - */ - private def putOffset(key: GroupTopicPartition, offsetAndMetadata: OffsetAndMetadata) { - offsetsCache.put(key, offsetAndMetadata) - } - - private def deleteExpiredOffsets() { - debug("Collecting expired offsets.") + // visible for testing + private[coordinator] def cleanupGroupMetadata() { val startMs = time.milliseconds() - - val numExpiredOffsetsRemoved = inWriteLock(offsetExpireLock) { - val expiredOffsets = offsetsCache.filter { case (groupTopicPartition, offsetAndMetadata) => - offsetAndMetadata.expireTimestamp < startMs - } - - debug("Found %d expired offsets.".format(expiredOffsets.size)) - - // delete the expired offsets from the table and generate tombstone messages to remove them from the log - val tombstonesForPartition = expiredOffsets.map { case (groupTopicAndPartition, offsetAndMetadata) => - val offsetsPartition = partitionFor(groupTopicAndPartition.group) - trace("Removing expired offset and metadata for %s: %s".format(groupTopicAndPartition, offsetAndMetadata)) - - offsetsCache.remove(groupTopicAndPartition) - - val commitKey = GroupMetadataManager.offsetCommitKey(groupTopicAndPartition.group, - groupTopicAndPartition.topicPartition.topic, groupTopicAndPartition.topicPartition.partition) - - val (magicValue, timestamp) = getMessageFormatVersionAndTimestamp(offsetsPartition) - (offsetsPartition, new Message(bytes = null, key = commitKey, timestamp = timestamp, magicValue = magicValue)) - }.groupBy { case (partition, tombstone) => partition } - - // Append the tombstone messages to the offset partitions. It is okay if the replicas don't receive these (say, - // if we crash or leaders move) since the new leaders will get rid of expired offsets during their own purge cycles. - tombstonesForPartition.flatMap { case (offsetsPartition, tombstones) => - val partitionOpt = replicaManager.getPartition(TopicConstants.GROUP_METADATA_TOPIC_NAME, offsetsPartition) - partitionOpt.map { partition => - val appendPartition = TopicAndPartition(TopicConstants.GROUP_METADATA_TOPIC_NAME, offsetsPartition) - val messages = tombstones.map(_._2).toSeq - - trace("Marked %d offsets in %s for deletion.".format(messages.size, appendPartition)) - - try { - // do not need to require acks since even if the tombstone is lost, - // it will be appended again in the next purge cycle - partition.appendMessagesToLeader(new ByteBufferMessageSet(config.offsetsTopicCompressionCodec, messages: _*)) - tombstones.size - } - catch { - case t: Throwable => - error("Failed to mark %d expired offsets for deletion in %s.".format(messages.size, appendPartition), t) + var offsetsRemoved = 0 + + groupMetadataCache.foreach { case (groupId, group) => + group synchronized { + if (!group.is(Dead)) { + val offsetsPartition = partitionFor(groupId) + + // delete the expired offsets from the table and generate tombstone messages to remove them from the log + val tombstones = group.removeExpiredOffsets(startMs).map { case (topicPartition, offsetAndMetadata) => + trace("Removing expired offset and metadata for %s, %s: %s".format(groupId, topicPartition, offsetAndMetadata)) + val commitKey = GroupMetadataManager.offsetCommitKey(groupId, topicPartition.topic, topicPartition.partition) + val (magicValue, timestamp) = getMessageFormatVersionAndTimestamp(offsetsPartition) + new Message(bytes = null, key = commitKey, timestamp = timestamp, magicValue = magicValue) + }.toBuffer + + val partitionOpt = replicaManager.getPartition(TopicConstants.GROUP_METADATA_TOPIC_NAME, offsetsPartition) + partitionOpt.foreach { partition => + val appendPartition = TopicAndPartition(TopicConstants.GROUP_METADATA_TOPIC_NAME, offsetsPartition) + trace("Marked %d offsets in %s for deletion.".format(tombstones.size, appendPartition)) + + try { + // do not need to require acks since even if the tombstone is lost, + // it will be appended again in the next purge cycle + partition.appendMessagesToLeader(new ByteBufferMessageSet(config.offsetsTopicCompressionCodec, tombstones: _*)) + offsetsRemoved += tombstones.size + } + catch { + case t: Throwable => + error("Failed to mark %d expired offsets for deletion in %s.".format(tombstones.size, appendPartition), t) // ignore and continue - 0 + } + } + + if (group.is(Empty) && !group.hasOffsets) { + group.transitionTo(Dead) + evictGroupAndDeleteMetadata(group) + info("Group %s generation %s is dead and removed".format(group.groupId, group.generationId)) } } - }.sum + } } - info("Removed %d expired offsets in %d milliseconds.".format(numExpiredOffsetsRemoved, time.milliseconds() - startMs)) + info("Removed %d expired offsets in %d milliseconds.".format(offsetsRemoved, time.milliseconds() - startMs)) + } private def getHighWatermark(partitionId: Int): Long = { @@ -607,9 +621,11 @@ class GroupMetadataManager(val brokerId: Int, metadata == null || metadata.length() <= config.maxMetadataSize } + def shutdown() { shuttingDown.set(true) - scheduler.shutdown() + if (scheduler.isStarted) + scheduler.shutdown() // TODO: clear the caches } @@ -642,7 +658,7 @@ class GroupMetadataManager(val brokerId: Int, * NOTE: this is for test only */ def addPartitionOwnership(partition: Int) { - loadingPartitions synchronized { + inLock(partitionLock) { ownedPartitions.add(partition) } } @@ -710,8 +726,8 @@ object GroupMetadataManager { private val GROUP_METADATA_VALUE_SCHEMA_V0 = new Schema(new Field("protocol_type", STRING), new Field("generation", INT32), - new Field("protocol", STRING), - new Field("leader", STRING), + new Field("protocol", NULLABLE_STRING), + new Field("leader", NULLABLE_STRING), new Field("members", new ArrayOf(MEMBER_METADATA_V0))) private val GROUP_METADATA_PROTOCOL_TYPE_V0 = GROUP_METADATA_VALUE_SCHEMA_V0.get("protocol_type") private val GROUP_METADATA_GENERATION_V0 = GROUP_METADATA_VALUE_SCHEMA_V0.get("generation") @@ -787,7 +803,7 @@ object GroupMetadataManager { * * @return key bytes for group metadata message */ - private def groupMetadataKey(group: String): Array[Byte] = { + def groupMetadataKey(group: String): Array[Byte] = { val key = new Struct(CURRENT_GROUP_KEY_SCHEMA) key.set(GROUP_KEY_GROUP_FIELD, group) @@ -823,10 +839,10 @@ object GroupMetadataManager { * @param groupMetadata * @return payload for offset commit message */ - private def groupMetadataValue(groupMetadata: GroupMetadata, assignment: Map[String, Array[Byte]]): Array[Byte] = { + def groupMetadataValue(groupMetadata: GroupMetadata, assignment: Map[String, Array[Byte]]): Array[Byte] = { // generate commit value with schema version 1 val value = new Struct(CURRENT_GROUP_VALUE_SCHEMA) - value.set(GROUP_METADATA_PROTOCOL_TYPE_V0, groupMetadata.protocolType) + value.set(GROUP_METADATA_PROTOCOL_TYPE_V0, groupMetadata.protocolType.getOrElse("")) value.set(GROUP_METADATA_GENERATION_V0, groupMetadata.generationId) value.set(GROUP_METADATA_PROTOCOL_V0, groupMetadata.protocol) value.set(GROUP_METADATA_LEADER_V0, groupMetadata.leaderId) @@ -937,13 +953,16 @@ object GroupMetadataManager { if (version == 0) { val protocolType = value.get(GROUP_METADATA_PROTOCOL_TYPE_V0).asInstanceOf[String] - val group = new GroupMetadata(groupId, protocolType) + val memberMetadataArray = value.getArray(GROUP_METADATA_MEMBERS_V0) + val initialState = if (memberMetadataArray.isEmpty) Empty else Stable + + val group = new GroupMetadata(groupId, initialState) group.generationId = value.get(GROUP_METADATA_GENERATION_V0).asInstanceOf[Int] group.leaderId = value.get(GROUP_METADATA_LEADER_V0).asInstanceOf[String] group.protocol = value.get(GROUP_METADATA_PROTOCOL_V0).asInstanceOf[String] - value.getArray(GROUP_METADATA_MEMBERS_V0).foreach { + memberMetadataArray.foreach { case memberMetadataObj => val memberMetadata = memberMetadataObj.asInstanceOf[Struct] val memberId = memberMetadata.get(MEMBER_METADATA_MEMBER_ID_V0).asInstanceOf[String] @@ -953,7 +972,7 @@ object GroupMetadataManager { val subscription = Utils.toArray(memberMetadata.get(MEMBER_METADATA_SUBSCRIPTION_V0).asInstanceOf[ByteBuffer]) val member = new MemberMetadata(memberId, groupId, clientId, clientHost, sessionTimeout, - List((group.protocol, subscription))) + protocolType, List((group.protocol, subscription))) member.assignment = Utils.toArray(memberMetadata.get(MEMBER_METADATA_ASSIGNMENT_V0).asInstanceOf[ByteBuffer]) @@ -1012,6 +1031,9 @@ object GroupMetadataManager { } +case class DelayedStore(messageSet: Map[TopicPartition, MessageSet], + callback: Map[TopicPartition, PartitionResponse] => Unit) + case class GroupTopicPartition(group: String, topicPartition: TopicPartition) { def this(group: String, topic: String, partition: Int) = http://git-wip-us.apache.org/repos/asf/kafka/blob/8c551675/core/src/main/scala/kafka/coordinator/MemberMetadata.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/coordinator/MemberMetadata.scala b/core/src/main/scala/kafka/coordinator/MemberMetadata.scala index c57b990..19c9e8e 100644 --- a/core/src/main/scala/kafka/coordinator/MemberMetadata.scala +++ b/core/src/main/scala/kafka/coordinator/MemberMetadata.scala @@ -56,6 +56,7 @@ private[coordinator] class MemberMetadata(val memberId: String, val clientId: String, val clientHost: String, val sessionTimeoutMs: Int, + val protocolType: String, var supportedProtocols: List[(String, Array[Byte])]) { var assignment: Array[Byte] = Array.empty[Byte] http://git-wip-us.apache.org/repos/asf/kafka/blob/8c551675/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala b/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala index dc343fa..fa13a92 100644 --- a/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala @@ -24,7 +24,7 @@ import kafka.common.OffsetAndMetadata import kafka.message.{Message, MessageSet} import kafka.server.{DelayedOperationPurgatory, ReplicaManager, KafkaConfig} import kafka.utils._ -import org.apache.kafka.common.{utils, TopicPartition} +import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.requests.{OffsetCommitRequest, JoinGroupRequest} import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse @@ -96,7 +96,7 @@ class GroupCoordinatorResponseTest extends JUnitSuite { val joinPurgatory = new DelayedOperationPurgatory[DelayedJoin]("Rebalance", timer, config.brokerId, reaperEnabled = false) groupCoordinator = GroupCoordinator(config, zkUtils, replicaManager, heartbeatPurgatory, joinPurgatory, timer.time) - groupCoordinator.startup() + groupCoordinator.startup(false) // add the partition into the owned partition list groupPartitionId = groupCoordinator.partitionFor(groupId) @@ -106,7 +106,8 @@ class GroupCoordinatorResponseTest extends JUnitSuite { @After def tearDown() { EasyMock.reset(replicaManager) - groupCoordinator.shutdown() + if (groupCoordinator != null) + groupCoordinator.shutdown() } @Test
