This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch 2.1 in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.1 by this push: new 8a4532c KAFKA-7610; Proactively timeout new group members if rebalance is delayed (#5962) 8a4532c is described below commit 8a4532cf7b5e61892a79cbd83b40b3c39788198b Author: Jason Gustafson <ja...@confluent.io> AuthorDate: Mon Dec 10 14:32:29 2018 -0800 KAFKA-7610; Proactively timeout new group members if rebalance is delayed (#5962) When a consumer first joins a group, it doesn't have an assigned memberId. If the rebalance is delayed for some reason, the client may disconnect after a request timeout and retry. Since the client had not received its memberId, then we do not have a way to detect the retry and expire the previously generated member id. This can lead to unbounded growth in the size of the group until the rebalance has completed. This patch fixes the problem by proactively completing all JoinGroup requests for new members after a timeout of 5 minutes. If the client is still around, we expect it to retry. Reviewers: Stanislav Kozlovski <stanislav_kozlov...@outlook.com>, Boyang Chen <bche...@outlook.com>, Guozhang Wang <wangg...@gmail.com> --- .../kafka/coordinator/group/DelayedHeartbeat.scala | 10 ++-- .../kafka/coordinator/group/GroupCoordinator.scala | 41 +++++++++++----- .../kafka/coordinator/group/GroupMetadata.scala | 12 ++--- .../kafka/coordinator/group/MemberMetadata.scala | 8 ++++ .../coordinator/group/GroupCoordinatorTest.scala | 55 +++++++++++++++++++--- 5 files changed, 98 insertions(+), 28 deletions(-) diff --git a/core/src/main/scala/kafka/coordinator/group/DelayedHeartbeat.scala b/core/src/main/scala/kafka/coordinator/group/DelayedHeartbeat.scala index 5f16acb..9377518 100644 --- a/core/src/main/scala/kafka/coordinator/group/DelayedHeartbeat.scala +++ b/core/src/main/scala/kafka/coordinator/group/DelayedHeartbeat.scala @@ -26,11 +26,11 @@ import kafka.server.DelayedOperation private[group] class DelayedHeartbeat(coordinator: GroupCoordinator, group: GroupMetadata, member: MemberMetadata, - heartbeatDeadline: Long, - sessionTimeout: Long) - extends DelayedOperation(sessionTimeout, Some(group.lock)) { + deadline: Long, + timeoutMs: Long) + extends DelayedOperation(timeoutMs, Some(group.lock)) { - override def tryComplete(): Boolean = coordinator.tryCompleteHeartbeat(group, member, heartbeatDeadline, forceComplete _) - override def onExpiration() = coordinator.onExpireHeartbeat(group, member, heartbeatDeadline) + override def tryComplete(): Boolean = coordinator.tryCompleteHeartbeat(group, member, deadline, forceComplete _) + override def onExpiration() = coordinator.onExpireHeartbeat(group, member, deadline) override def onComplete() = coordinator.onCompleteHeartbeat() } diff --git a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala index db89f14..007c6ee 100644 --- a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala +++ b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala @@ -600,7 +600,7 @@ class GroupCoordinator(val brokerId: Int, case Empty | Dead => case PreparingRebalance => for (member <- group.allMemberMetadata) { - group.invokeJoinCallback(member, joinError(member.memberId, Errors.NOT_COORDINATOR)) + group.maybeInvokeJoinCallback(member, joinError(member.memberId, Errors.NOT_COORDINATOR)) } joinPurgatory.checkAndComplete(GroupKey(group.groupId)) @@ -674,14 +674,18 @@ class GroupCoordinator(val brokerId: Int, * Complete existing DelayedHeartbeats for the given member and schedule the next one */ private def completeAndScheduleNextHeartbeatExpiration(group: GroupMetadata, member: MemberMetadata) { + completeAndScheduleNextExpiration(group, member, member.sessionTimeoutMs) + } + + private def completeAndScheduleNextExpiration(group: GroupMetadata, member: MemberMetadata, timeoutMs: Long): Unit = { // complete current heartbeat expectation member.latestHeartbeat = time.milliseconds() val memberKey = MemberKey(member.groupId, member.memberId) heartbeatPurgatory.checkAndComplete(memberKey) // reschedule the next heartbeat expiration deadline - val newHeartbeatDeadline = member.latestHeartbeat + member.sessionTimeoutMs - val delayedHeartbeat = new DelayedHeartbeat(this, group, member, newHeartbeatDeadline, member.sessionTimeoutMs) + val deadline = member.latestHeartbeat + timeoutMs + val delayedHeartbeat = new DelayedHeartbeat(this, group, member, deadline, timeoutMs) heartbeatPurgatory.tryCompleteElseWatch(delayedHeartbeat, Seq(memberKey)) } @@ -702,11 +706,23 @@ class GroupCoordinator(val brokerId: Int, val memberId = clientId + "-" + group.generateMemberIdSuffix val member = new MemberMetadata(memberId, group.groupId, clientId, clientHost, rebalanceTimeoutMs, sessionTimeoutMs, protocolType, protocols) + + member.isNew = true + // update the newMemberAdded flag to indicate that the join group can be further delayed if (group.is(PreparingRebalance) && group.generationId == 0) group.newMemberAdded = true group.add(member, callback) + + // The session timeout does not affect new members since they do not have their memberId and + // cannot send heartbeats. Furthermore, we cannot detect disconnects because sockets are muted + // while the JoinGroup is in purgatory. If the client does disconnect (e.g. because of a request + // timeout during a long rebalance), they may simply retry which will lead to a lot of defunct + // members in the rebalance. To prevent this going on indefinitely, we timeout JoinGroup requests + // for new members. If the new member is still there, we expect it to retry. + completeAndScheduleNextExpiration(group, member, NewMemberJoinTimeoutMs) + maybePrepareRebalance(group, s"Adding new member $memberId") member } @@ -751,7 +767,13 @@ class GroupCoordinator(val brokerId: Int, } private def removeMemberAndUpdateGroup(group: GroupMetadata, member: MemberMetadata, reason: String) { + // New members may timeout with a pending JoinGroup while the group is still rebalancing, so we have + // to invoke the callback before removing the member. We return UNKNOWN_MEMBER_ID so that the consumer + // will retry the JoinGroup request if is still active. + group.maybeInvokeJoinCallback(member, joinError(NoMemberId, Errors.UNKNOWN_MEMBER_ID)) + group.remove(member.memberId) + group.currentState match { case Dead | Empty => case Stable | CompletingRebalance => maybePrepareRebalance(group, reason) @@ -813,8 +835,9 @@ class GroupCoordinator(val brokerId: Int, leaderId = group.leaderOrNull, error = Errors.NONE) - group.invokeJoinCallback(member, joinResult) + group.maybeInvokeJoinCallback(member, joinResult) completeAndScheduleNextHeartbeatExpiration(group, member) + member.isNew = false } } } @@ -823,7 +846,7 @@ class GroupCoordinator(val brokerId: Int, def tryCompleteHeartbeat(group: GroupMetadata, member: MemberMetadata, heartbeatDeadline: Long, forceComplete: () => Boolean) = { group.inLock { - if (shouldKeepMemberAlive(member, heartbeatDeadline) || member.isLeaving) + if (member.shouldKeepAlive(heartbeatDeadline) || member.isLeaving) forceComplete() else false } @@ -831,7 +854,7 @@ class GroupCoordinator(val brokerId: Int, def onExpireHeartbeat(group: GroupMetadata, member: MemberMetadata, heartbeatDeadline: Long) { group.inLock { - if (!shouldKeepMemberAlive(member, heartbeatDeadline)) { + if (!member.shouldKeepAlive(heartbeatDeadline)) { info(s"Member ${member.memberId} in group ${group.groupId} has failed, removing it from the group") removeMemberAndUpdateGroup(group, member, s"removing member ${member.memberId} on heartbeat expiration") } @@ -844,11 +867,6 @@ class GroupCoordinator(val brokerId: Int, def partitionFor(group: String): Int = groupManager.partitionFor(group) - private def shouldKeepMemberAlive(member: MemberMetadata, heartbeatDeadline: Long) = - member.awaitingJoinCallback != null || - member.awaitingSyncCallback != null || - member.latestHeartbeat + member.sessionTimeoutMs > heartbeatDeadline - private def isCoordinatorForGroup(groupId: String) = groupManager.isGroupLocal(groupId) private def isCoordinatorLoadInProgress(groupId: String) = groupManager.isGroupLoading(groupId) @@ -865,6 +883,7 @@ object GroupCoordinator { val NoMembers = List[MemberSummary]() val EmptyGroup = GroupSummary(NoState, NoProtocolType, NoProtocol, NoMembers) val DeadGroup = GroupSummary(Dead.toString, NoProtocolType, NoProtocol, NoMembers) + val NewMemberJoinTimeoutMs: Int = 5 * 60 * 1000 def apply(config: KafkaConfig, zkClient: KafkaZkClient, diff --git a/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala b/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala index cbe78e9..e2d9c5f 100644 --- a/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala +++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala @@ -220,7 +220,7 @@ private[group] class GroupMetadata(val groupId: String, initialState: GroupState member.supportedProtocols.foreach{ case (protocol, _) => supportedProtocols(protocol) += 1 } member.awaitingJoinCallback = callback if (member.awaitingJoinCallback != null) - numMembersAwaitingJoin += 1; + numMembersAwaitingJoin += 1 } def remove(memberId: String) { @@ -300,19 +300,19 @@ private[group] class GroupMetadata(val groupId: String, initialState: GroupState member.supportedProtocols = protocols if (callback != null && member.awaitingJoinCallback == null) { - numMembersAwaitingJoin += 1; + numMembersAwaitingJoin += 1 } else if (callback == null && member.awaitingJoinCallback != null) { - numMembersAwaitingJoin -= 1; + numMembersAwaitingJoin -= 1 } member.awaitingJoinCallback = callback } - def invokeJoinCallback(member: MemberMetadata, - joinGroupResult: JoinGroupResult) : Unit = { + def maybeInvokeJoinCallback(member: MemberMetadata, + joinGroupResult: JoinGroupResult) : Unit = { if (member.awaitingJoinCallback != null) { member.awaitingJoinCallback(joinGroupResult) member.awaitingJoinCallback = null - numMembersAwaitingJoin -= 1; + numMembersAwaitingJoin -= 1 } } diff --git a/core/src/main/scala/kafka/coordinator/group/MemberMetadata.scala b/core/src/main/scala/kafka/coordinator/group/MemberMetadata.scala index b082b9b..8649b3e 100644 --- a/core/src/main/scala/kafka/coordinator/group/MemberMetadata.scala +++ b/core/src/main/scala/kafka/coordinator/group/MemberMetadata.scala @@ -64,6 +64,7 @@ private[group] class MemberMetadata(val memberId: String, var awaitingSyncCallback: (Array[Byte], Errors) => Unit = null var latestHeartbeat: Long = -1 var isLeaving: Boolean = false + var isNew: Boolean = false def protocols = supportedProtocols.map(_._1).toSet @@ -78,6 +79,13 @@ private[group] class MemberMetadata(val memberId: String, } } + def shouldKeepAlive(deadlineMs: Long): Boolean = { + if (awaitingJoinCallback != null) + !isNew || latestHeartbeat + GroupCoordinator.NewMemberJoinTimeoutMs > deadlineMs + else awaitingSyncCallback != null || + latestHeartbeat + sessionTimeoutMs > deadlineMs + } + /** * Check if the provided protocol metadata matches the currently stored metadata. */ diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala index c162342..1ef695e 100644 --- a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala @@ -56,8 +56,8 @@ class GroupCoordinatorTest extends JUnitSuite { val ClientId = "consumer-test" val ClientHost = "localhost" - val ConsumerMinSessionTimeout = 10 - val ConsumerMaxSessionTimeout = 1000 + val GroupMinSessionTimeout = 10 + val GroupMaxSessionTimeout = 10 * 60 * 1000 val DefaultRebalanceTimeout = 500 val DefaultSessionTimeout = 500 val GroupInitialRebalanceDelay = 50 @@ -80,8 +80,8 @@ class GroupCoordinatorTest extends JUnitSuite { @Before def setUp() { val props = TestUtils.createBrokerConfig(nodeId = 0, zkConnect = "") - props.setProperty(KafkaConfig.GroupMinSessionTimeoutMsProp, ConsumerMinSessionTimeout.toString) - props.setProperty(KafkaConfig.GroupMaxSessionTimeoutMsProp, ConsumerMaxSessionTimeout.toString) + props.setProperty(KafkaConfig.GroupMinSessionTimeoutMsProp, GroupMinSessionTimeout.toString) + props.setProperty(KafkaConfig.GroupMaxSessionTimeoutMsProp, GroupMaxSessionTimeout.toString) props.setProperty(KafkaConfig.GroupInitialRebalanceDelayMsProp, GroupInitialRebalanceDelay.toString) // make two partitions of the group topic to make sure some partitions are not owned by the coordinator val ret = mutable.Map[String, Map[Int, Seq[Int]]]() @@ -194,7 +194,7 @@ class GroupCoordinatorTest extends JUnitSuite { def testJoinGroupSessionTimeoutTooSmall() { val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID - val joinGroupResult = joinGroup(groupId, memberId, protocolType, protocols, sessionTimeout = ConsumerMinSessionTimeout - 1) + val joinGroupResult = joinGroup(groupId, memberId, protocolType, protocols, sessionTimeout = GroupMinSessionTimeout - 1) val joinGroupError = joinGroupResult.error assertEquals(Errors.INVALID_SESSION_TIMEOUT, joinGroupError) } @@ -203,7 +203,7 @@ class GroupCoordinatorTest extends JUnitSuite { def testJoinGroupSessionTimeoutTooLarge() { val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID - val joinGroupResult = joinGroup(groupId, memberId, protocolType, protocols, sessionTimeout = ConsumerMaxSessionTimeout + 1) + val joinGroupResult = joinGroup(groupId, memberId, protocolType, protocols, sessionTimeout = GroupMaxSessionTimeout + 1) val joinGroupError = joinGroupResult.error assertEquals(Errors.INVALID_SESSION_TIMEOUT, joinGroupError) } @@ -263,6 +263,49 @@ class GroupCoordinatorTest extends JUnitSuite { } @Test + def testNewMemberJoinExpiration(): Unit = { + // This tests new member expiration during a protracted rebalance. We first create a + // group with one member which uses a large value for session timeout and rebalance timeout. + // We then join with one new member and let the rebalance hang while we await the first member. + // The new member join timeout expires and its JoinGroup request is failed. + + val sessionTimeout = GroupCoordinator.NewMemberJoinTimeoutMs + 5000 + val rebalanceTimeout = GroupCoordinator.NewMemberJoinTimeoutMs * 2 + + val firstJoinResult = joinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocols, + sessionTimeout, rebalanceTimeout) + val firstMemberId = firstJoinResult.memberId + assertEquals(firstMemberId, firstJoinResult.leaderId) + assertEquals(Errors.NONE, firstJoinResult.error) + + val groupOpt = groupCoordinator.groupManager.getGroup(groupId) + assertTrue(groupOpt.isDefined) + val group = groupOpt.get + assertEquals(0, group.allMemberMetadata.count(_.isNew)) + + EasyMock.reset(replicaManager) + + val responseFuture = sendJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocols, + rebalanceTimeout, sessionTimeout) + assertFalse(responseFuture.isCompleted) + + assertEquals(2, group.allMembers.size) + assertEquals(1, group.allMemberMetadata.count(_.isNew)) + + val newMember = group.allMemberMetadata.find(_.isNew).get + assertNotEquals(firstMemberId, newMember.memberId) + + timer.advanceClock(GroupCoordinator.NewMemberJoinTimeoutMs + 1) + assertTrue(responseFuture.isCompleted) + + val response = Await.result(responseFuture, Duration(0, TimeUnit.MILLISECONDS)) + assertEquals(Errors.UNKNOWN_MEMBER_ID, response.error) + assertEquals(1, group.allMembers.size) + assertEquals(0, group.allMemberMetadata.count(_.isNew)) + assertEquals(firstMemberId, group.allMembers.head) + } + + @Test def testJoinGroupInconsistentGroupProtocol() { val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID