This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch 2.4 in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.4 by this push: new 78679cf KAFKA-9140: Also reset join future when generation was reset in order to re-join (#7647) 78679cf is described below commit 78679cf8ce283648b4b934f098edac3fb7916f1a Author: Guozhang Wang <wangg...@gmail.com> AuthorDate: Wed Nov 6 09:47:08 2019 -0800 KAFKA-9140: Also reset join future when generation was reset in order to re-join (#7647) Otherwise the join-group would not be resend and we'd just fall into the endless loop. Reviewers: Jason Gustafson <ja...@confluent.io>, Boyang Chen <boy...@confluent.io>, A. Sophie Blee-Goldman <sop...@confluent.io> --- .../clients/consumer/internals/AbstractCoordinator.java | 10 ++++++---- .../consumer/internals/ConsumerCoordinatorTest.java | 14 ++++++++++++-- 2 files changed, 18 insertions(+), 6 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java index 3a76276..62f720e 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java @@ -410,7 +410,7 @@ public abstract class AbstractCoordinator implements Closeable { // Can't use synchronized for {@code onJoinComplete}, because it can be long enough // and shouldn't block hearbeat thread. // See {@link PlaintextConsumerTest#testMaxPollIntervalMsDelayInAssignment - synchronized (this) { + synchronized (AbstractCoordinator.this) { generationSnapshot = this.generation; } @@ -420,14 +420,16 @@ public abstract class AbstractCoordinator implements Closeable { onJoinComplete(generationSnapshot.generationId, generationSnapshot.memberId, generationSnapshot.protocol, memberAssignment); - // We reset the join group future only after the completion callback returns. This ensures + // Generally speaking we should always resetJoinGroupFuture once the future is done, but here + // we can only reset the join group future after the completion callback returns. This ensures // that if the callback is woken up, we will retry it on the next joinGroupIfNeeded. + // And because of that we should explicitly trigger resetJoinGroupFuture in other conditions below. resetJoinGroupFuture(); needsJoinPrepare = true; } else { log.info("Generation data was cleared by heartbeat thread. Initiating rejoin."); resetStateAndRejoin(); - + resetJoinGroupFuture(); return false; } } else { @@ -451,7 +453,7 @@ public abstract class AbstractCoordinator implements Closeable { this.joinFuture = null; } - private void resetStateAndRejoin() { + private synchronized void resetStateAndRejoin() { rejoinNeeded = true; state = MemberState.UNJOINED; } diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java index 5ff9761..6617fa2 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java @@ -2333,7 +2333,7 @@ public class ConsumerCoordinatorTest { assertFalse(res); assertFalse(client.hasPendingResponses()); - //SynGroupRequest not responded. + // SynGroupRequest not responded. assertEquals(1, client.inFlightRequestCount()); assertEquals(generationId, coordinator.generation().generationId); assertEquals(memberId, coordinator.generation().memberId); @@ -2345,12 +2345,22 @@ public class ConsumerCoordinatorTest { client.respond(syncGroupResponse(singletonList(t1p), Errors.NONE)); - //Join future should succeed but generation already cleared so result of join is false. + // Join future should succeed but generation already cleared so result of join is false. res = coordinator.joinGroupIfNeeded(time.timer(1)); assertFalse(res); assertFalse(client.hasPendingResponses()); assertFalse(client.hasInFlightRequests()); + + // Retry join should then succeed + client.prepareResponse(joinGroupFollowerResponse(generationId, memberId, "leader", Errors.NONE)); + client.prepareResponse(syncGroupResponse(singletonList(t1p), Errors.NONE)); + + res = coordinator.joinGroupIfNeeded(time.timer(2)); + + assertTrue(res); + assertFalse(client.hasPendingResponses()); + assertFalse(client.hasInFlightRequests()); } }