This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.1 by this push:
new 831bbd4 KAFKA-9232; Coordinator new member timeout does not work for
JoinGroup v3 and below (#7753)
831bbd4 is described below
commit 831bbd4489c36207042d3b70b30c01075aca895f
Author: A. Sophie Blee-Goldman <[email protected]>
AuthorDate: Mon Dec 23 18:55:48 2019 -0500
KAFKA-9232; Coordinator new member timeout does not work for JoinGroup v3
and below (#7753)
The v3 JoinGroup logic does not properly complete the initial heartbeat for
new members, which then expires after the static 5 minute timeout if the member
does not rejoin. The core issue is in the `shouldKeepAlive` method, which
returns false when it should return true because of an inconsistent timeout
check.
Reviewers: Jason Gustafson <[email protected]>
---
.../kafka/coordinator/group/GroupMetadata.scala | 1 +
.../kafka/coordinator/group/MemberMetadata.scala | 14 ++++++++--
.../coordinator/group/GroupCoordinatorTest.scala | 32 +++++++++++++++++++++-
3 files changed, 43 insertions(+), 4 deletions(-)
diff --git a/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala
b/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala
index e2d9c5f..b075125 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala
@@ -200,6 +200,7 @@ private[group] class GroupMetadata(val groupId: String,
initialState: GroupState
def not(groupState: GroupState) = state != groupState
def has(memberId: String) = members.contains(memberId)
def get(memberId: String) = members(memberId)
+ def size = members.size
def isLeader(memberId: String): Boolean = leaderId.contains(memberId)
def leaderOrNull: String = leaderId.orNull
diff --git a/core/src/main/scala/kafka/coordinator/group/MemberMetadata.scala
b/core/src/main/scala/kafka/coordinator/group/MemberMetadata.scala
index 8649b3e..4c7a50e 100644
--- a/core/src/main/scala/kafka/coordinator/group/MemberMetadata.scala
+++ b/core/src/main/scala/kafka/coordinator/group/MemberMetadata.scala
@@ -67,6 +67,8 @@ private[group] class MemberMetadata(val memberId: String,
var isNew: Boolean = false
def protocols = supportedProtocols.map(_._1).toSet
+ def isAwaitingJoin = awaitingJoinCallback != null
+ def isAwaitingSync = awaitingSyncCallback != null
/**
* Get metadata corresponding to the provided protocol.
@@ -80,10 +82,16 @@ private[group] class MemberMetadata(val memberId: String,
}
def shouldKeepAlive(deadlineMs: Long): Boolean = {
- if (awaitingJoinCallback != null)
- !isNew || latestHeartbeat + GroupCoordinator.NewMemberJoinTimeoutMs >
deadlineMs
- else awaitingSyncCallback != null ||
+ if (isNew) {
+ // New members are expired after the static join timeout
+ latestHeartbeat + GroupCoordinator.NewMemberJoinTimeoutMs > deadlineMs
+ } else if (isAwaitingJoin || isAwaitingSync) {
+ // Don't remove members as long as they have a request in purgatory
+ true
+ } else {
+ // Otherwise check for session expiration
latestHeartbeat + sessionTimeoutMs > deadlineMs
+ }
}
/**
diff --git
a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
index 1ef695e..14b3957 100644
---
a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
+++
b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
@@ -263,6 +263,36 @@ class GroupCoordinatorTest extends JUnitSuite {
}
@Test
+ def testNewMemberTimeoutCompletion(): Unit = {
+ val sessionTimeout = GroupCoordinator.NewMemberJoinTimeoutMs + 5000
+ val responseFuture = sendJoinGroup(groupId,
JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocols,
DefaultRebalanceTimeout, sessionTimeout)
+
+ timer.advanceClock(GroupInitialRebalanceDelay + 1)
+
+ val joinResult = Await.result(responseFuture,
Duration(DefaultRebalanceTimeout + 100, TimeUnit.MILLISECONDS))
+ val group = groupCoordinator.groupManager.getGroup(groupId).get
+ val memberId = joinResult.memberId
+
+ assertEquals(Errors.NONE, joinResult.error)
+ assertEquals(0, group.allMemberMetadata.count(_.isNew))
+
+ EasyMock.reset(replicaManager)
+ val syncGroupResult = syncGroupLeader(groupId, joinResult.generationId,
memberId, Map(memberId -> Array[Byte]()))
+ val syncGroupError = syncGroupResult._2
+
+ assertEquals(Errors.NONE, syncGroupError)
+ assertEquals(1, group.size)
+
+ timer.advanceClock(GroupCoordinator.NewMemberJoinTimeoutMs + 100)
+
+ // Make sure the NewMemberTimeout is not still in effect, and the member
is not kicked
+ assertEquals(1, group.size)
+
+ timer.advanceClock(sessionTimeout + 100)
+ assertEquals(0, group.size)
+ }
+
+ @Test
def testNewMemberJoinExpiration(): Unit = {
// This tests new member expiration during a protracted rebalance. We
first create a
// group with one member which uses a large value for session timeout and
rebalance timeout.
@@ -765,7 +795,7 @@ class GroupCoordinatorTest extends JUnitSuite {
val nextGenerationId = joinResult.generationId
- // with no leader SyncGroup, the follower's request should failure with an
error indicating
+ // with no leader SyncGroup, the follower's request should fail with an
error indicating
// that it should rejoin
EasyMock.reset(replicaManager)
val followerSyncFuture = sendSyncGroupFollower(groupId, nextGenerationId,
otherJoinResult.memberId)