This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 79341a1  KAFKA-8715; Fix buggy reliance on state timestamp in static 
member.id generation (#7116)
79341a1 is described below

commit 79341a12f235510d9f78af24d979102c3d8882e1
Author: Boyang Chen <[email protected]>
AuthorDate: Fri Jul 26 15:31:31 2019 -0700

    KAFKA-8715; Fix buggy reliance on state timestamp in static member.id 
generation (#7116)
    
    The bug is that we accidentally used the current state timestamp for the 
group instead of the real current time. When a group is first loaded, this 
timestamp is not initialized, so this resulted in a `NoSuchElementException`. 
Additionally this violated the intended uniqueness of the memberId, which could 
have broken the group instance fencing. Fix is made and unit test to make sure 
the timestamp is properly encoded within the returned member.id.
    
    Reviewers: Ismael Juma <[email protected]>, Guozhang Wang 
<[email protected]>, Jason Gustafson <[email protected]>
---
 .../scala/kafka/coordinator/group/GroupMetadata.scala   |  2 +-
 .../kafka/coordinator/group/GroupCoordinatorTest.scala  | 17 +++++++++++++++++
 2 files changed, 18 insertions(+), 1 deletion(-)

diff --git a/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala 
b/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala
index 58a68a2..4d283bc8 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala
@@ -365,7 +365,7 @@ private[group] class GroupMetadata(val groupId: String, 
initialState: GroupState
       case None =>
         clientId + GroupMetadata.MemberIdDelimiter + UUID.randomUUID().toString
       case Some(instanceId) =>
-        instanceId + GroupMetadata.MemberIdDelimiter + 
currentStateTimestamp.get
+        instanceId + GroupMetadata.MemberIdDelimiter + 
UUID.randomUUID().toString
     }
   }
 
diff --git 
a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala 
b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
index 0a7bcc7..d72bfc8 100644
--- 
a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
+++ 
b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
@@ -874,6 +874,23 @@ class GroupCoordinatorTest {
   }
 
   @Test
+  def shouldGetDifferentStaticMemberIdAfterEachRejoin(): Unit = {
+    val initialResult = staticMembersJoinAndRebalance(leaderInstanceId, 
followerInstanceId)
+
+    val timeAdvance = 1
+    var lastMemberId = initialResult.leaderId
+    for (_ <- 1 to 5) {
+      EasyMock.reset(replicaManager)
+
+      val joinGroupResult = staticJoinGroup(groupId, 
JoinGroupRequest.UNKNOWN_MEMBER_ID,
+        leaderInstanceId, protocolType, protocols, clockAdvance = timeAdvance)
+      assertTrue(joinGroupResult.memberId.startsWith(leaderInstanceId.get))
+      assertNotEquals(lastMemberId, joinGroupResult.memberId)
+      lastMemberId = joinGroupResult.memberId
+    }
+  }
+
+  @Test
   def testOffsetCommitDeadGroup() {
     val memberId = "memberId"
 

Reply via email to