This is an automated email from the ASF dual-hosted git repository. jgus pushed a commit to branch 2.3 in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.3 by this push: new 3b3eb70 KAFKA-9752; New member timeout can leave group rebalance stuck (#8339) 3b3eb70 is described below commit 3b3eb705cdcb876e23e7aa4c2086de6244b0b52b Author: Jason Gustafson <ja...@confluent.io> AuthorDate: Tue Mar 24 22:16:49 2020 -0700 KAFKA-9752; New member timeout can leave group rebalance stuck (#8339) Older versions of the JoinGroup rely on a new member timeout to keep the group from growing indefinitely in the case of client disconnects and retrying. The logic for resetting the heartbeat expiration task following completion of the rebalance failed to account for an implicit expectation that shouldKeepAlive would return false the first time it is invoked when a heartbeat expiration is scheduled. This patch fixes the issue by making heartbeat satisfaction logic explicit. Reviewers: Chia-Ping Tsai <chia7...@gmail.com>, Guozhang Wang <wangg...@gmail.com>, Rajini Sivaram <rajinisiva...@googlemail.com> --- .../kafka/coordinator/group/DelayedHeartbeat.scala | 5 +- .../kafka/coordinator/group/GroupCoordinator.scala | 39 ++++---- .../kafka/coordinator/group/MemberMetadata.scala | 20 ++-- .../coordinator/group/GroupCoordinatorTest.scala | 111 +++++++++++++++++++-- 4 files changed, 136 insertions(+), 39 deletions(-) diff --git a/core/src/main/scala/kafka/coordinator/group/DelayedHeartbeat.scala b/core/src/main/scala/kafka/coordinator/group/DelayedHeartbeat.scala index 09c5eea..3f402d9 100644 --- a/core/src/main/scala/kafka/coordinator/group/DelayedHeartbeat.scala +++ b/core/src/main/scala/kafka/coordinator/group/DelayedHeartbeat.scala @@ -27,11 +27,10 @@ private[group] class DelayedHeartbeat(coordinator: GroupCoordinator, group: GroupMetadata, memberId: String, isPending: Boolean, - deadline: Long, timeoutMs: Long) extends DelayedOperation(timeoutMs, Some(group.lock)) { - override def tryComplete(): Boolean = coordinator.tryCompleteHeartbeat(group, memberId, isPending, deadline, forceComplete _) - override def onExpiration() = coordinator.onExpireHeartbeat(group, memberId, isPending, deadline) + override def tryComplete(): Boolean = coordinator.tryCompleteHeartbeat(group, memberId, isPending, forceComplete _) + override def onExpiration() = coordinator.onExpireHeartbeat(group, memberId, isPending) override def onComplete() = coordinator.onCompleteHeartbeat() } diff --git a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala index 1e1a759..bbd4766 100644 --- a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala +++ b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala @@ -772,15 +772,16 @@ class GroupCoordinator(val brokerId: Int, completeAndScheduleNextExpiration(group, member, member.sessionTimeoutMs) } - private def completeAndScheduleNextExpiration(group: GroupMetadata, member: MemberMetadata, timeoutMs: Long) { - // complete current heartbeat expectation - member.latestHeartbeat = time.milliseconds() + private def completeAndScheduleNextExpiration(group: GroupMetadata, member: MemberMetadata, timeoutMs: Long): Unit = { val memberKey = MemberKey(member.groupId, member.memberId) + + // complete current heartbeat expectation + member.heartbeatSatisfied = true heartbeatPurgatory.checkAndComplete(memberKey) // reschedule the next heartbeat expiration deadline - val deadline = member.latestHeartbeat + timeoutMs - val delayedHeartbeat = new DelayedHeartbeat(this, group, member.memberId, isPending = false, deadline, timeoutMs) + member.heartbeatSatisfied = false + val delayedHeartbeat = new DelayedHeartbeat(this, group, member.memberId, isPending = false, timeoutMs) heartbeatPurgatory.tryCompleteElseWatch(delayedHeartbeat, Seq(memberKey)) } @@ -789,8 +790,7 @@ class GroupCoordinator(val brokerId: Int, */ private def addPendingMemberExpiration(group: GroupMetadata, pendingMemberId: String, timeoutMs: Long) { val pendingMemberKey = MemberKey(group.groupId, pendingMemberId) - val deadline = time.milliseconds() + timeoutMs - val delayedHeartbeat = new DelayedHeartbeat(this, group, pendingMemberId, isPending = true, deadline, timeoutMs) + val delayedHeartbeat = new DelayedHeartbeat(this, group, pendingMemberId, isPending = true, timeoutMs) heartbeatPurgatory.tryCompleteElseWatch(delayedHeartbeat, Seq(pendingMemberKey)) } @@ -973,7 +973,10 @@ class GroupCoordinator(val brokerId: Int, } } - def tryCompleteHeartbeat(group: GroupMetadata, memberId: String, isPending: Boolean, heartbeatDeadline: Long, forceComplete: () => Boolean) = { + def tryCompleteHeartbeat(group: GroupMetadata, + memberId: String, + isPending: Boolean, + forceComplete: () => Boolean): Boolean = { group.inLock { // The group has been unloaded and invalid, we should complete the heartbeat. if (group.is(Dead)) { @@ -983,25 +986,23 @@ class GroupCoordinator(val brokerId: Int, if (group.has(memberId)) { forceComplete() } else false - } else { - if (shouldCompleteNonPendingHeartbeat(group, memberId, heartbeatDeadline)) { - forceComplete() - } else false - } + } else if (shouldCompleteNonPendingHeartbeat(group, memberId)) { + forceComplete() + } else false } } - def shouldCompleteNonPendingHeartbeat(group: GroupMetadata, memberId: String, heartbeatDeadline: Long): Boolean = { + def shouldCompleteNonPendingHeartbeat(group: GroupMetadata, memberId: String): Boolean = { if (group.has(memberId)) { val member = group.get(memberId) - member.shouldKeepAlive(heartbeatDeadline) || member.isLeaving + member.hasSatisfiedHeartbeat || member.isLeaving } else { - info(s"Member id $memberId was not found in ${group.groupId} during heartbeat expiration.") - false + info(s"Member id $memberId was not found in ${group.groupId} during heartbeat completion check") + true } } - def onExpireHeartbeat(group: GroupMetadata, memberId: String, isPending: Boolean, heartbeatDeadline: Long): Unit = { + def onExpireHeartbeat(group: GroupMetadata, memberId: String, isPending: Boolean): Unit = { group.inLock { if (group.is(Dead)) { info(s"Received notification of heartbeat expiration for member $memberId after group ${group.groupId} had already been unloaded or deleted.") @@ -1012,7 +1013,7 @@ class GroupCoordinator(val brokerId: Int, debug(s"Member $memberId has already been removed from the group.") } else { val member = group.get(memberId) - if (!member.shouldKeepAlive(heartbeatDeadline)) { + if (!member.hasSatisfiedHeartbeat) { info(s"Member ${member.memberId} in group ${group.groupId} has failed, removing it from the group") removeMemberAndUpdateGroup(group, member, s"removing member ${member.memberId} on heartbeat expiration") } diff --git a/core/src/main/scala/kafka/coordinator/group/MemberMetadata.scala b/core/src/main/scala/kafka/coordinator/group/MemberMetadata.scala index fc90c95..c73b0b3 100644 --- a/core/src/main/scala/kafka/coordinator/group/MemberMetadata.scala +++ b/core/src/main/scala/kafka/coordinator/group/MemberMetadata.scala @@ -67,11 +67,17 @@ private[group] class MemberMetadata(var memberId: String, var assignment: Array[Byte] = Array.empty[Byte] var awaitingJoinCallback: JoinGroupResult => Unit = null var awaitingSyncCallback: SyncGroupResult => Unit = null - var latestHeartbeat: Long = -1 var isLeaving: Boolean = false var isNew: Boolean = false val isStaticMember: Boolean = groupInstanceId.isDefined + // This variable is used to track heartbeat completion through the delayed + // heartbeat purgatory. When scheduling a new heartbeat expiration, we set + // this value to `false`. Upon receiving the heartbeat (or any other event + // indicating the liveness of the client), we set it to `true` so that the + // delayed heartbeat can be completed. + var heartbeatSatisfied: Boolean = false + def isAwaitingJoin = awaitingJoinCallback != null def isAwaitingSync = awaitingSyncCallback != null @@ -86,16 +92,16 @@ private[group] class MemberMetadata(var memberId: String, } } - def shouldKeepAlive(deadlineMs: Long): Boolean = { + def hasSatisfiedHeartbeat: Boolean = { if (isNew) { - // New members are expired after the static join timeout - latestHeartbeat + GroupCoordinator.NewMemberJoinTimeoutMs > deadlineMs + // New members can be expired while awaiting join, so we have to check this first + heartbeatSatisfied } else if (isAwaitingJoin || isAwaitingSync) { - // Don't remove members as long as they have a request in purgatory + // Members that are awaiting a rebalance automatically satisfy expected heartbeats true } else { - // Otherwise check for session expiration - latestHeartbeat + sessionTimeoutMs > deadlineMs + // Otherwise we require the next heartbeat + heartbeatSatisfied } } diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala index d62a123..3b79e98 100644 --- a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala @@ -379,7 +379,96 @@ class GroupCoordinatorTest { } @Test - def testJoinGroupInconsistentGroupProtocol() { + def testNewMemberFailureAfterJoinGroupCompletion(): Unit = { + // For old versions of the JoinGroup protocol, new members were subject + // to expiration if the rebalance took long enough. This test case ensures + // that following completion of the JoinGroup phase, new members follow + // normal heartbeat expiration logic. + + val firstJoinResult = dynamicJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocols) + val firstMemberId = firstJoinResult.memberId + val firstGenerationId = firstJoinResult.generationId + assertEquals(firstMemberId, firstJoinResult.leaderId) + assertEquals(Errors.NONE, firstJoinResult.error) + + EasyMock.reset(replicaManager) + val firstSyncResult = syncGroupLeader(groupId, firstGenerationId, firstMemberId, + Map(firstMemberId -> Array[Byte]())) + assertEquals(Errors.NONE, firstSyncResult._2) + + EasyMock.reset(replicaManager) + val otherJoinFuture = sendJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocols) + + EasyMock.reset(replicaManager) + val joinFuture = sendJoinGroup(groupId, firstMemberId, protocolType, protocols, + requireKnownMemberId = false) + + val joinResult = await(joinFuture, DefaultSessionTimeout+100) + val otherJoinResult = await(otherJoinFuture, DefaultSessionTimeout+100) + assertEquals(Errors.NONE, joinResult.error) + assertEquals(Errors.NONE, otherJoinResult.error) + + verifySessionExpiration(groupId) + } + + @Test + def testNewMemberFailureAfterSyncGroupCompletion(): Unit = { + // For old versions of the JoinGroup protocol, new members were subject + // to expiration if the rebalance took long enough. This test case ensures + // that following completion of the SyncGroup phase, new members follow + // normal heartbeat expiration logic. + + val firstJoinResult = dynamicJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocols) + val firstMemberId = firstJoinResult.memberId + val firstGenerationId = firstJoinResult.generationId + assertEquals(firstMemberId, firstJoinResult.leaderId) + assertEquals(Errors.NONE, firstJoinResult.error) + + EasyMock.reset(replicaManager) + val firstSyncResult = syncGroupLeader(groupId, firstGenerationId, firstMemberId, + Map(firstMemberId -> Array[Byte]())) + assertEquals(Errors.NONE, firstSyncResult._2) + + EasyMock.reset(replicaManager) + val otherJoinFuture = sendJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocols) + + EasyMock.reset(replicaManager) + val joinFuture = sendJoinGroup(groupId, firstMemberId, protocolType, protocols, + requireKnownMemberId = false) + + val joinResult = await(joinFuture, DefaultSessionTimeout+100) + val otherJoinResult = await(otherJoinFuture, DefaultSessionTimeout+100) + assertEquals(Errors.NONE, joinResult.error) + assertEquals(Errors.NONE, otherJoinResult.error) + val secondGenerationId = joinResult.generationId + val secondMemberId = otherJoinResult.memberId + + EasyMock.reset(replicaManager) + sendSyncGroupFollower(groupId, secondGenerationId, secondMemberId) + + EasyMock.reset(replicaManager) + val syncGroupResult = syncGroupLeader(groupId, secondGenerationId, firstMemberId, + Map(firstMemberId -> Array.emptyByteArray, secondMemberId -> Array.emptyByteArray)) + assertEquals(Errors.NONE, syncGroupResult._2) + + verifySessionExpiration(groupId) + } + + private def verifySessionExpiration(groupId: String): Unit = { + EasyMock.reset(replicaManager) + EasyMock.expect(replicaManager.getMagic(EasyMock.anyObject())) + .andReturn(Some(RecordBatch.CURRENT_MAGIC_VALUE)).anyTimes() + EasyMock.replay(replicaManager) + + timer.advanceClock(DefaultSessionTimeout + 1) + + val groupMetadata = group(groupId) + assertEquals(Empty, groupMetadata.currentState) + assertEquals(0, groupMetadata.allMembers.size) + } + + @Test + def testJoinGroupInconsistentGroupProtocol(): Unit = { val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID val otherMemberId = JoinGroupRequest.UNKNOWN_MEMBER_ID @@ -667,12 +756,14 @@ class GroupCoordinatorTest { } @Test - def staticMemberRejoinWithLeaderIdAndKnownMemberId() { - val rebalanceResult = staticMembersJoinAndRebalance(leaderInstanceId, followerInstanceId, sessionTimeout = DefaultRebalanceTimeout / 2) + def staticMemberRejoinWithLeaderIdAndKnownMemberId(): Unit = { + val rebalanceResult = staticMembersJoinAndRebalance(leaderInstanceId, followerInstanceId, + sessionTimeout = DefaultRebalanceTimeout / 2) // A static leader with known id rejoin will trigger rebalance. EasyMock.reset(replicaManager) - val joinGroupResult = staticJoinGroup(groupId, rebalanceResult.leaderId, leaderInstanceId, protocolType, protocolSuperset, clockAdvance = DefaultRebalanceTimeout + 1) + val joinGroupResult = staticJoinGroup(groupId, rebalanceResult.leaderId, leaderInstanceId, + protocolType, protocolSuperset, clockAdvance = DefaultRebalanceTimeout + 1) // Timeout follower in the meantime. assertFalse(getGroup(groupId).hasStaticMember(followerInstanceId)) checkJoinGroupResult(joinGroupResult, @@ -2699,8 +2790,8 @@ class GroupCoordinatorTest { val group = getGroup(groupId) group.transitionTo(Dead) val leaderMemberId = rebalanceResult.leaderId - assertTrue(groupCoordinator.tryCompleteHeartbeat(group, leaderMemberId, false, DefaultSessionTimeout, () => true)) - groupCoordinator.onExpireHeartbeat(group, leaderMemberId, false, DefaultSessionTimeout) + assertTrue(groupCoordinator.tryCompleteHeartbeat(group, leaderMemberId, false, () => true)) + groupCoordinator.onExpireHeartbeat(group, leaderMemberId, false) assertTrue(group.has(leaderMemberId)) } @@ -2712,8 +2803,7 @@ class GroupCoordinatorTest { val group = getGroup(groupId) val leaderMemberId = rebalanceResult.leaderId group.remove(leaderMemberId) - assertFalse(groupCoordinator.tryCompleteHeartbeat(group, leaderMemberId, false, DefaultSessionTimeout, () => true)) - groupCoordinator.onExpireHeartbeat(group, leaderMemberId, false, DefaultSessionTimeout) + assertTrue(groupCoordinator.tryCompleteHeartbeat(group, leaderMemberId, false, () => true)) } private def getGroup(groupId: String): GroupMetadata = { @@ -2800,12 +2890,13 @@ class GroupCoordinatorTest { private def sendSyncGroupFollower(groupId: String, generation: Int, memberId: String, - groupInstanceId: Option[String]): Future[SyncGroupCallbackParams] = { + groupInstanceId: Option[String] = None): Future[SyncGroupCallbackParams] = { val (responseFuture, responseCallback) = setupSyncGroupCallback EasyMock.replay(replicaManager) - groupCoordinator.handleSyncGroup(groupId, generation, memberId, groupInstanceId, Map.empty[String, Array[Byte]], responseCallback) + groupCoordinator.handleSyncGroup(groupId, generation, memberId, groupInstanceId, + Map.empty[String, Array[Byte]], responseCallback) responseFuture }