This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch 2.4
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.4 by this push:
     new 78679cf  KAFKA-9140: Also reset join future when generation was reset 
in order to re-join (#7647)
78679cf is described below

commit 78679cf8ce283648b4b934f098edac3fb7916f1a
Author: Guozhang Wang <wangg...@gmail.com>
AuthorDate: Wed Nov 6 09:47:08 2019 -0800

    KAFKA-9140: Also reset join future when generation was reset in order to 
re-join (#7647)
    
    Otherwise the join-group would not be resend and we'd just fall into the 
endless loop.
    
    Reviewers: Jason Gustafson <ja...@confluent.io>, Boyang Chen 
<boy...@confluent.io>, A. Sophie Blee-Goldman <sop...@confluent.io>
---
 .../clients/consumer/internals/AbstractCoordinator.java    | 10 ++++++----
 .../consumer/internals/ConsumerCoordinatorTest.java        | 14 ++++++++++++--
 2 files changed, 18 insertions(+), 6 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
index 3a76276..62f720e 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
@@ -410,7 +410,7 @@ public abstract class AbstractCoordinator implements 
Closeable {
                 // Can't use synchronized for {@code onJoinComplete}, because 
it can be long enough
                 // and  shouldn't block hearbeat thread.
                 // See {@link 
PlaintextConsumerTest#testMaxPollIntervalMsDelayInAssignment
-                synchronized (this) {
+                synchronized (AbstractCoordinator.this) {
                     generationSnapshot = this.generation;
                 }
 
@@ -420,14 +420,16 @@ public abstract class AbstractCoordinator implements 
Closeable {
 
                     onJoinComplete(generationSnapshot.generationId, 
generationSnapshot.memberId, generationSnapshot.protocol, memberAssignment);
 
-                    // We reset the join group future only after the 
completion callback returns. This ensures
+                    // Generally speaking we should always 
resetJoinGroupFuture once the future is done, but here
+                    // we can only reset the join group future after the 
completion callback returns. This ensures
                     // that if the callback is woken up, we will retry it on 
the next joinGroupIfNeeded.
+                    // And because of that we should explicitly trigger 
resetJoinGroupFuture in other conditions below.
                     resetJoinGroupFuture();
                     needsJoinPrepare = true;
                 } else {
                     log.info("Generation data was cleared by heartbeat thread. 
Initiating rejoin.");
                     resetStateAndRejoin();
-
+                    resetJoinGroupFuture();
                     return false;
                 }
             } else {
@@ -451,7 +453,7 @@ public abstract class AbstractCoordinator implements 
Closeable {
         this.joinFuture = null;
     }
 
-    private void resetStateAndRejoin() {
+    private synchronized void resetStateAndRejoin() {
         rejoinNeeded = true;
         state = MemberState.UNJOINED;
     }
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index 5ff9761..6617fa2 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -2333,7 +2333,7 @@ public class ConsumerCoordinatorTest {
 
             assertFalse(res);
             assertFalse(client.hasPendingResponses());
-            //SynGroupRequest not responded.
+            // SynGroupRequest not responded.
             assertEquals(1, client.inFlightRequestCount());
             assertEquals(generationId, coordinator.generation().generationId);
             assertEquals(memberId, coordinator.generation().memberId);
@@ -2345,12 +2345,22 @@ public class ConsumerCoordinatorTest {
 
             client.respond(syncGroupResponse(singletonList(t1p), Errors.NONE));
 
-            //Join future should succeed but generation already cleared so 
result of join is false.
+            // Join future should succeed but generation already cleared so 
result of join is false.
             res = coordinator.joinGroupIfNeeded(time.timer(1));
 
             assertFalse(res);
             assertFalse(client.hasPendingResponses());
             assertFalse(client.hasInFlightRequests());
+
+            // Retry join should then succeed
+            client.prepareResponse(joinGroupFollowerResponse(generationId, 
memberId, "leader", Errors.NONE));
+            client.prepareResponse(syncGroupResponse(singletonList(t1p), 
Errors.NONE));
+
+            res = coordinator.joinGroupIfNeeded(time.timer(2));
+
+            assertTrue(res);
+            assertFalse(client.hasPendingResponses());
+            assertFalse(client.hasInFlightRequests());
         }
     }
 

Reply via email to