This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new fb1f749  KAFKA-8386; Use COORDINATOR_NOT_AVAILABLE error when group is 
Dead (#6762)
fb1f749 is described below

commit fb1f74958d1431af7c45a4d499b3b6ffef0bf70e
Author: Boyang Chen <[email protected]>
AuthorDate: Wed Jun 5 14:20:04 2019 -0700

    KAFKA-8386; Use COORDINATOR_NOT_AVAILABLE error when group is Dead (#6762)
    
    The Dead state in the coordinator is used for groups which are either 
pending deletion or migration to a new coordinator. Currently requests received 
while in this state result in an UNKNOWN_MEMBER_ID which causes consumers to 
reset the memberId. This is a problem for KIP-345 since it can cause an older 
member to fence a newer member. This patch changes the error code returned in 
this state to COORDINATOR_NOT_AVAILABLE, which causes the consumer to 
rediscover the coordinator, but not  [...]
    
    Reviewers: Guozhang Wang <[email protected]>, Jason Gustafson 
<[email protected]>
---
 .../kafka/coordinator/group/GroupCoordinator.scala | 28 +++++------
 .../coordinator/group/GroupCoordinatorTest.scala   | 56 +++++++++++++++++++---
 2 files changed, 61 insertions(+), 23 deletions(-)

diff --git a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala 
b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
index daf38f3..5d40e9b 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
@@ -170,8 +170,8 @@ class GroupCoordinator(val brokerId: Int,
         // if the group is marked as dead, it means some other thread has just 
removed the group
         // from the coordinator metadata; it is likely that the group has 
migrated to some other
         // coordinator OR the group is in a transient unstable phase. Let the 
member retry
-        // joining without the specified member id.
-        responseCallback(joinError(JoinGroupRequest.UNKNOWN_MEMBER_ID, 
Errors.UNKNOWN_MEMBER_ID))
+        // finding the correct coordinator and rejoin.
+        responseCallback(joinError(JoinGroupRequest.UNKNOWN_MEMBER_ID, 
Errors.COORDINATOR_NOT_AVAILABLE))
       } else if (!group.supportsProtocols(protocolType, 
MemberMetadata.plainProtocolSet(protocols))) {
         responseCallback(joinError(JoinGroupRequest.UNKNOWN_MEMBER_ID, 
Errors.INCONSISTENT_GROUP_PROTOCOL))
       } else {
@@ -235,8 +235,8 @@ class GroupCoordinator(val brokerId: Int,
         // if the group is marked as dead, it means some other thread has just 
removed the group
         // from the coordinator metadata; this is likely that the group has 
migrated to some other
         // coordinator OR the group is in a transient unstable phase. Let the 
member retry
-        // joining without the specified member id.
-        responseCallback(joinError(memberId, Errors.UNKNOWN_MEMBER_ID))
+        // finding the correct coordinator and rejoin.
+        responseCallback(joinError(memberId, Errors.COORDINATOR_NOT_AVAILABLE))
       } else if (!group.supportsProtocols(protocolType, 
MemberMetadata.plainProtocolSet(protocols))) {
         responseCallback(joinError(memberId, 
Errors.INCONSISTENT_GROUP_PROTOCOL))
       } else if (group.isPendingMember(memberId)) {
@@ -351,8 +351,8 @@ class GroupCoordinator(val brokerId: Int,
         // if the group is marked as dead, it means some other thread has just 
removed the group
         // from the coordinator metadata; this is likely that the group has 
migrated to some other
         // coordinator OR the group is in a transient unstable phase. Let the 
member retry
-        // joining without the specified member id.
-        responseCallback(Array.empty, Errors.UNKNOWN_MEMBER_ID)
+        // finding the correct coordinator and rejoin.
+        responseCallback(Array.empty, Errors.COORDINATOR_NOT_AVAILABLE)
       } else if (group.isStaticMemberFenced(memberId, groupInstanceId)) {
         responseCallback(Array.empty, Errors.FENCED_INSTANCE_ID)
       } else if (!group.has(memberId)) {
@@ -417,16 +417,12 @@ class GroupCoordinator(val brokerId: Int,
 
     groupManager.getGroup(groupId) match {
       case None =>
-        // if the group is marked as dead, it means some other thread has just 
removed the group
-        // from the coordinator metadata; it is likely that the group has 
migrated to some other
-        // coordinator OR the group is in a transient unstable phase. Let the 
consumer to retry
-        // joining without specified consumer id,
         responseCallback(Errors.UNKNOWN_MEMBER_ID)
 
       case Some(group) =>
         group.inLock {
           if (group.is(Dead)) {
-            responseCallback(Errors.UNKNOWN_MEMBER_ID)
+            responseCallback(Errors.COORDINATOR_NOT_AVAILABLE)
           } else if (group.isPendingMember(memberId)) {
             // if a pending member is leaving, it needs to be removed from the 
pending list, heartbeat cancelled
             // and if necessary, prompt a JoinGroup completion.
@@ -509,10 +505,10 @@ class GroupCoordinator(val brokerId: Int,
       case Some(group) => group.inLock {
         if (group.is(Dead)) {
           // if the group is marked as dead, it means some other thread has 
just removed the group
-          // from the coordinator metadata; it is likely that the group has 
migrated to some other
+          // from the coordinator metadata; this is likely that the group has 
migrated to some other
           // coordinator OR the group is in a transient unstable phase. Let 
the member retry
-          // joining without the specified member id.
-          responseCallback(Errors.UNKNOWN_MEMBER_ID)
+          // finding the correct coordinator and rejoin.
+          responseCallback(Errors.COORDINATOR_NOT_AVAILABLE)
         } else if (group.isStaticMemberFenced(memberId, groupInstanceId)) {
           responseCallback(Errors.FENCED_INSTANCE_ID)
         } else if (!group.has(memberId)) {
@@ -609,8 +605,8 @@ class GroupCoordinator(val brokerId: Int,
         // if the group is marked as dead, it means some other thread has just 
removed the group
         // from the coordinator metadata; it is likely that the group has 
migrated to some other
         // coordinator OR the group is in a transient unstable phase. Let the 
member retry
-        // joining without the specified member id.
-        responseCallback(offsetMetadata.mapValues(_ => 
Errors.UNKNOWN_MEMBER_ID))
+        // finding the correct coordinator and rejoin.
+        responseCallback(offsetMetadata.mapValues(_ => 
Errors.COORDINATOR_NOT_AVAILABLE))
       } else if (group.isStaticMemberFenced(memberId, groupInstanceId)) {
         responseCallback(offsetMetadata.mapValues(_ => 
Errors.FENCED_INSTANCE_ID))
       } else if ((generationId < 0 && group.is(Empty)) || (producerId != 
NO_PRODUCER_ID)) {
diff --git 
a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala 
b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
index 770868c..5a1b20a 100644
--- 
a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
+++ 
b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
@@ -386,7 +386,18 @@ class GroupCoordinatorTest {
 
     groupCoordinator.groupManager.addGroup(new GroupMetadata(deadGroupId, 
Dead, new MockTime()))
     val joinGroupResult = dynamicJoinGroup(deadGroupId, memberId, 
protocolType, protocols)
-    assertEquals(Errors.UNKNOWN_MEMBER_ID, joinGroupResult.error)
+    assertEquals(Errors.COORDINATOR_NOT_AVAILABLE, joinGroupResult.error)
+  }
+
+  @Test
+  def testSyncDeadGroup() {
+    val memberId = "memberId"
+
+    val deadGroupId = "deadGroupId"
+
+    groupCoordinator.groupManager.addGroup(new GroupMetadata(deadGroupId, 
Dead, new MockTime()))
+    val syncGroupResult = syncGroupFollower(deadGroupId, 1, memberId)
+    assertEquals(Errors.COORDINATOR_NOT_AVAILABLE, syncGroupResult._2)
   }
 
   @Test
@@ -507,7 +518,7 @@ class GroupCoordinatorTest {
 
     EasyMock.reset(replicaManager)
     val joinGroupResult = staticJoinGroup(groupId, rebalanceResult.leaderId, 
leaderInstanceId, protocolType, protocols, clockAdvance = 1)
-    assertEquals(Errors.UNKNOWN_MEMBER_ID, joinGroupResult.error)
+    assertEquals(Errors.COORDINATOR_NOT_AVAILABLE, joinGroupResult.error)
   }
 
   @Test
@@ -716,6 +727,19 @@ class GroupCoordinatorTest {
   }
 
   @Test
+  def testOffsetCommitDeadGroup() {
+    val memberId = "memberId"
+
+    val deadGroupId = "deadGroupId"
+    val tp = new TopicPartition("topic", 0)
+    val offset = offsetAndMetadata(0)
+
+    groupCoordinator.groupManager.addGroup(new GroupMetadata(deadGroupId, 
Dead, new MockTime()))
+    val offsetCommitResult = commitOffsets(deadGroupId, memberId, 1, Map(tp -> 
offset))
+    assertEquals(Errors.COORDINATOR_NOT_AVAILABLE, offsetCommitResult(tp))
+  }
+
+  @Test
   def staticMemberCommitOffsetWithInvalidMemberId() {
     val rebalanceResult = staticMembersJoinAndRebalance(leaderInstanceId, 
followerInstanceId)
 
@@ -999,12 +1023,22 @@ class GroupCoordinatorTest {
 
   @Test
   def testHeartbeatUnknownGroup() {
-
     val heartbeatResult = heartbeat(groupId, memberId, -1)
     assertEquals(Errors.UNKNOWN_MEMBER_ID, heartbeatResult)
   }
 
   @Test
+  def testheartbeatDeadGroup() {
+    val memberId = "memberId"
+
+    val deadGroupId = "deadGroupId"
+
+    groupCoordinator.groupManager.addGroup(new GroupMetadata(deadGroupId, 
Dead, new MockTime()))
+    val heartbeatResult = heartbeat(deadGroupId, memberId, 1)
+    assertEquals(Errors.COORDINATOR_NOT_AVAILABLE, heartbeatResult)
+  }
+
+  @Test
   def testHeartbeatUnknownConsumerExistingGroup() {
     val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
     val otherMemberId = "memberId"
@@ -1304,9 +1338,7 @@ class GroupCoordinatorTest {
 
   @Test
   def testSyncGroupFromUnknownGroup() {
-    val generation = 1
-
-    val syncGroupResult = syncGroupFollower(groupId, generation, memberId)
+    val syncGroupResult = syncGroupFollower(groupId, 1, memberId)
     assertEquals(Errors.UNKNOWN_MEMBER_ID, syncGroupResult._2)
   }
 
@@ -2179,12 +2211,22 @@ class GroupCoordinatorTest {
 
   @Test
   def testLeaveGroupUnknownGroup() {
-
     val leaveGroupResult = leaveGroup(groupId, memberId)
     assertEquals(Errors.UNKNOWN_MEMBER_ID, leaveGroupResult)
   }
 
   @Test
+  def testLeaveDeadGroup() {
+    val memberId = "memberId"
+
+    val deadGroupId = "deadGroupId"
+
+    groupCoordinator.groupManager.addGroup(new GroupMetadata(deadGroupId, 
Dead, new MockTime()))
+    val leaveGroupResult = leaveGroup(deadGroupId, memberId)
+    assertEquals(Errors.COORDINATOR_NOT_AVAILABLE, leaveGroupResult)
+  }
+
+  @Test
   def testLeaveGroupUnknownConsumerExistingGroup() {
     val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
     val otherMemberId = "consumerId"

Reply via email to