Repository: kafka Updated Branches: refs/heads/0.10.1 be20ea528 -> c9f7fa29d
KAFKA-3782: Ensure heartbeat thread restarted after rebalance woken up Author: Jason Gustafson <ja...@confluent.io> Reviewers: Guozhang Wang Closes #1898 from hachikuji/KAFKA-3782 (cherry picked from commit 732fabf94ebc9631d31f2feb2116ee8b63beabef) Signed-off-by: Guozhang Wang <wangg...@gmail.com> Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/c9f7fa29 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/c9f7fa29 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/c9f7fa29 Branch: refs/heads/0.10.1 Commit: c9f7fa29d1b0c165f13ff6e18442580b04cd8ec9 Parents: be20ea5 Author: Jason Gustafson <ja...@confluent.io> Authored: Thu Sep 22 10:07:50 2016 -0700 Committer: Guozhang Wang <wangg...@gmail.com> Committed: Thu Sep 22 10:07:59 2016 -0700 ---------------------------------------------------------------------- .../consumer/internals/AbstractCoordinator.java | 23 +- .../internals/AbstractCoordinatorTest.java | 323 ++++++++++++++++++- 2 files changed, 330 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/c9f7fa29/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java index 159ac27..73543ad 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java @@ -232,6 +232,10 @@ public abstract class AbstractCoordinator implements Closeable { return rejoinNeeded; } + private synchronized boolean rejoinIncomplete() { + return joinFuture != null; + } + /** * Check the status of the heartbeat thread (if it is active) and indicate the liveness * of the client. This must be called periodically after joining with {@link #ensureActiveGroup()} @@ -287,7 +291,7 @@ public abstract class AbstractCoordinator implements Closeable { // visible for testing. Joins the group without starting the heartbeat thread. void joinGroupIfNeeded() { - while (needRejoin()) { + while (needRejoin() || rejoinIncomplete()) { ensureCoordinatorReady(); // call onJoinPrepare if needed. We set a flag to make sure that we do not call it a second @@ -300,18 +304,6 @@ public abstract class AbstractCoordinator implements Closeable { needsJoinPrepare = false; } - // fence off the heartbeat thread explicitly so that it cannot interfere with the join group. - // Note that this must come after the call to onJoinPrepare since we must be able to continue - // sending heartbeats if that callback takes some time. - disableHeartbeatThread(); - - // ensure that there are no pending requests to the coordinator. This is important - // in particular to avoid resending a pending JoinGroup request. - if (client.pendingRequestCount(this.coordinator) > 0) { - client.awaitPendingRequests(this.coordinator); - continue; - } - RequestFuture<ByteBuffer> future = initiateJoinGroup(); client.poll(future); resetJoinGroupFuture(); @@ -341,6 +333,11 @@ public abstract class AbstractCoordinator implements Closeable { // rebalance in the call to poll below. This ensures that we do not mistakenly attempt // to rejoin before the pending rebalance has completed. if (joinFuture == null) { + // fence off the heartbeat thread explicitly so that it cannot interfere with the join group. + // Note that this must come after the call to onJoinPrepare since we must be able to continue + // sending heartbeats if that callback takes some time. + disableHeartbeatThread(); + state = MemberState.REBALANCING; joinFuture = sendJoinGroupRequest(); joinFuture.addListener(new RequestFutureListener<ByteBuffer>() { http://git-wip-us.apache.org/repos/asf/kafka/blob/c9f7fa29/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java index 4f8425a..3c8c793 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java @@ -21,6 +21,7 @@ import org.apache.kafka.clients.Metadata; import org.apache.kafka.clients.MockClient; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.Node; +import org.apache.kafka.common.errors.WakeupException; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; @@ -32,6 +33,7 @@ import org.apache.kafka.common.requests.JoinGroupResponse; import org.apache.kafka.common.requests.SyncGroupResponse; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.Time; +import org.apache.kafka.test.TestCondition; import org.apache.kafka.test.TestUtils; import org.junit.Before; import org.junit.Test; @@ -41,6 +43,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -106,7 +109,6 @@ public class AbstractCoordinatorTest { mockClient.prepareResponse(joinGroupFollowerResponse(1, "memberId", "leaderId", Errors.NONE)); mockClient.prepareResponse(syncGroupResponse(Errors.NONE)); - final RuntimeException e = new RuntimeException(); // raise the error when the background thread tries to send a heartbeat @@ -152,6 +154,318 @@ public class AbstractCoordinatorTest { assertTrue("New request not sent after previous completed", future != coordinator.lookupCoordinator()); } + @Test + public void testWakeupAfterJoinGroupSent() throws Exception { + mockClient.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); + mockClient.prepareResponse(new MockClient.RequestMatcher() { + private int invocations = 0; + @Override + public boolean matches(ClientRequest request) { + invocations++; + boolean isJoinGroupRequest = request.request().header().apiKey() == ApiKeys.JOIN_GROUP.id; + if (isJoinGroupRequest && invocations == 1) + // simulate wakeup before the request returns + throw new WakeupException(); + return isJoinGroupRequest; + } + }, joinGroupFollowerResponse(1, "memberId", "leaderId", Errors.NONE)); + mockClient.prepareResponse(syncGroupResponse(Errors.NONE)); + AtomicBoolean heartbeatReceived = prepareFirstHeartbeat(); + + try { + coordinator.ensureActiveGroup(); + fail("Should have woken up from ensureActiveGroup()"); + } catch (WakeupException e) { + } + + assertEquals(1, coordinator.onJoinPrepareInvokes); + assertEquals(0, coordinator.onJoinCompleteInvokes); + assertFalse(heartbeatReceived.get()); + + coordinator.ensureActiveGroup(); + + assertEquals(1, coordinator.onJoinPrepareInvokes); + assertEquals(1, coordinator.onJoinCompleteInvokes); + + awaitFirstHeartbeat(heartbeatReceived); + } + + @Test + public void testWakeupAfterJoinGroupSentExternalCompletion() throws Exception { + mockClient.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); + mockClient.prepareResponse(new MockClient.RequestMatcher() { + private int invocations = 0; + @Override + public boolean matches(ClientRequest request) { + invocations++; + boolean isJoinGroupRequest = request.request().header().apiKey() == ApiKeys.JOIN_GROUP.id; + if (isJoinGroupRequest && invocations == 1) + // simulate wakeup before the request returns + throw new WakeupException(); + return isJoinGroupRequest; + } + }, joinGroupFollowerResponse(1, "memberId", "leaderId", Errors.NONE)); + mockClient.prepareResponse(syncGroupResponse(Errors.NONE)); + AtomicBoolean heartbeatReceived = prepareFirstHeartbeat(); + + try { + coordinator.ensureActiveGroup(); + fail("Should have woken up from ensureActiveGroup()"); + } catch (WakeupException e) { + } + + assertEquals(1, coordinator.onJoinPrepareInvokes); + assertEquals(0, coordinator.onJoinCompleteInvokes); + assertFalse(heartbeatReceived.get()); + + // the join group completes in this poll() + consumerClient.poll(0); + coordinator.ensureActiveGroup(); + + assertEquals(1, coordinator.onJoinPrepareInvokes); + assertEquals(1, coordinator.onJoinCompleteInvokes); + + awaitFirstHeartbeat(heartbeatReceived); + } + + @Test + public void testWakeupAfterJoinGroupReceived() throws Exception { + mockClient.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); + mockClient.prepareResponse(new MockClient.RequestMatcher() { + @Override + public boolean matches(ClientRequest request) { + boolean isJoinGroupRequest = request.request().header().apiKey() == ApiKeys.JOIN_GROUP.id; + if (isJoinGroupRequest) + // wakeup after the request returns + consumerClient.wakeup(); + return isJoinGroupRequest; + } + }, joinGroupFollowerResponse(1, "memberId", "leaderId", Errors.NONE)); + mockClient.prepareResponse(syncGroupResponse(Errors.NONE)); + AtomicBoolean heartbeatReceived = prepareFirstHeartbeat(); + + try { + coordinator.ensureActiveGroup(); + fail("Should have woken up from ensureActiveGroup()"); + } catch (WakeupException e) { + } + + assertEquals(1, coordinator.onJoinPrepareInvokes); + assertEquals(0, coordinator.onJoinCompleteInvokes); + assertFalse(heartbeatReceived.get()); + + coordinator.ensureActiveGroup(); + + assertEquals(1, coordinator.onJoinPrepareInvokes); + assertEquals(1, coordinator.onJoinCompleteInvokes); + + awaitFirstHeartbeat(heartbeatReceived); + } + + @Test + public void testWakeupAfterJoinGroupReceivedExternalCompletion() throws Exception { + mockClient.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); + mockClient.prepareResponse(new MockClient.RequestMatcher() { + @Override + public boolean matches(ClientRequest request) { + boolean isJoinGroupRequest = request.request().header().apiKey() == ApiKeys.JOIN_GROUP.id; + if (isJoinGroupRequest) + // wakeup after the request returns + consumerClient.wakeup(); + return isJoinGroupRequest; + } + }, joinGroupFollowerResponse(1, "memberId", "leaderId", Errors.NONE)); + mockClient.prepareResponse(syncGroupResponse(Errors.NONE)); + AtomicBoolean heartbeatReceived = prepareFirstHeartbeat(); + + try { + coordinator.ensureActiveGroup(); + fail("Should have woken up from ensureActiveGroup()"); + } catch (WakeupException e) { + } + + assertEquals(1, coordinator.onJoinPrepareInvokes); + assertEquals(0, coordinator.onJoinCompleteInvokes); + assertFalse(heartbeatReceived.get()); + + // the join group completes in this poll() + consumerClient.poll(0); + coordinator.ensureActiveGroup(); + + assertEquals(1, coordinator.onJoinPrepareInvokes); + assertEquals(1, coordinator.onJoinCompleteInvokes); + + awaitFirstHeartbeat(heartbeatReceived); + } + + @Test + public void testWakeupAfterSyncGroupSent() throws Exception { + mockClient.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); + mockClient.prepareResponse(joinGroupFollowerResponse(1, "memberId", "leaderId", Errors.NONE)); + mockClient.prepareResponse(new MockClient.RequestMatcher() { + private int invocations = 0; + @Override + public boolean matches(ClientRequest request) { + invocations++; + boolean isSyncGroupRequest = request.request().header().apiKey() == ApiKeys.SYNC_GROUP.id; + if (isSyncGroupRequest && invocations == 1) + // simulate wakeup after the request sent + throw new WakeupException(); + return isSyncGroupRequest; + } + }, syncGroupResponse(Errors.NONE)); + AtomicBoolean heartbeatReceived = prepareFirstHeartbeat(); + + try { + coordinator.ensureActiveGroup(); + fail("Should have woken up from ensureActiveGroup()"); + } catch (WakeupException e) { + } + + assertEquals(1, coordinator.onJoinPrepareInvokes); + assertEquals(0, coordinator.onJoinCompleteInvokes); + assertFalse(heartbeatReceived.get()); + + coordinator.ensureActiveGroup(); + + assertEquals(1, coordinator.onJoinPrepareInvokes); + assertEquals(1, coordinator.onJoinCompleteInvokes); + + awaitFirstHeartbeat(heartbeatReceived); + } + + @Test + public void testWakeupAfterSyncGroupSentExternalCompletion() throws Exception { + mockClient.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); + mockClient.prepareResponse(joinGroupFollowerResponse(1, "memberId", "leaderId", Errors.NONE)); + mockClient.prepareResponse(new MockClient.RequestMatcher() { + private int invocations = 0; + @Override + public boolean matches(ClientRequest request) { + invocations++; + boolean isSyncGroupRequest = request.request().header().apiKey() == ApiKeys.SYNC_GROUP.id; + if (isSyncGroupRequest && invocations == 1) + // simulate wakeup after the request sent + throw new WakeupException(); + return isSyncGroupRequest; + } + }, syncGroupResponse(Errors.NONE)); + AtomicBoolean heartbeatReceived = prepareFirstHeartbeat(); + + try { + coordinator.ensureActiveGroup(); + fail("Should have woken up from ensureActiveGroup()"); + } catch (WakeupException e) { + } + + assertEquals(1, coordinator.onJoinPrepareInvokes); + assertEquals(0, coordinator.onJoinCompleteInvokes); + assertFalse(heartbeatReceived.get()); + + // the join group completes in this poll() + consumerClient.poll(0); + coordinator.ensureActiveGroup(); + + assertEquals(1, coordinator.onJoinPrepareInvokes); + assertEquals(1, coordinator.onJoinCompleteInvokes); + + awaitFirstHeartbeat(heartbeatReceived); + } + + @Test + public void testWakeupAfterSyncGroupReceived() throws Exception { + mockClient.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); + mockClient.prepareResponse(joinGroupFollowerResponse(1, "memberId", "leaderId", Errors.NONE)); + mockClient.prepareResponse(new MockClient.RequestMatcher() { + @Override + public boolean matches(ClientRequest request) { + boolean isSyncGroupRequest = request.request().header().apiKey() == ApiKeys.SYNC_GROUP.id; + if (isSyncGroupRequest) + // wakeup after the request returns + consumerClient.wakeup(); + return isSyncGroupRequest; + } + }, syncGroupResponse(Errors.NONE)); + AtomicBoolean heartbeatReceived = prepareFirstHeartbeat(); + + try { + coordinator.ensureActiveGroup(); + fail("Should have woken up from ensureActiveGroup()"); + } catch (WakeupException e) { + } + + assertEquals(1, coordinator.onJoinPrepareInvokes); + assertEquals(0, coordinator.onJoinCompleteInvokes); + assertFalse(heartbeatReceived.get()); + + coordinator.ensureActiveGroup(); + + assertEquals(1, coordinator.onJoinPrepareInvokes); + assertEquals(1, coordinator.onJoinCompleteInvokes); + + awaitFirstHeartbeat(heartbeatReceived); + } + + @Test + public void testWakeupAfterSyncGroupReceivedExternalCompletion() throws Exception { + mockClient.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); + mockClient.prepareResponse(joinGroupFollowerResponse(1, "memberId", "leaderId", Errors.NONE)); + mockClient.prepareResponse(new MockClient.RequestMatcher() { + @Override + public boolean matches(ClientRequest request) { + boolean isSyncGroupRequest = request.request().header().apiKey() == ApiKeys.SYNC_GROUP.id; + if (isSyncGroupRequest) + // wakeup after the request returns + consumerClient.wakeup(); + return isSyncGroupRequest; + } + }, syncGroupResponse(Errors.NONE)); + AtomicBoolean heartbeatReceived = prepareFirstHeartbeat(); + + try { + coordinator.ensureActiveGroup(); + fail("Should have woken up from ensureActiveGroup()"); + } catch (WakeupException e) { + } + + assertEquals(1, coordinator.onJoinPrepareInvokes); + assertEquals(0, coordinator.onJoinCompleteInvokes); + assertFalse(heartbeatReceived.get()); + + // the join group completes in this poll() + consumerClient.poll(0); + coordinator.ensureActiveGroup(); + + assertEquals(1, coordinator.onJoinPrepareInvokes); + assertEquals(1, coordinator.onJoinCompleteInvokes); + + awaitFirstHeartbeat(heartbeatReceived); + } + + private AtomicBoolean prepareFirstHeartbeat() { + final AtomicBoolean heartbeatReceived = new AtomicBoolean(false); + mockClient.prepareResponse(new MockClient.RequestMatcher() { + @Override + public boolean matches(ClientRequest request) { + boolean isHeartbeatRequest = request.request().header().apiKey() == ApiKeys.HEARTBEAT.id; + if (isHeartbeatRequest) + heartbeatReceived.set(true); + return isHeartbeatRequest; + } + }, heartbeatResponse(Errors.UNKNOWN)); + return heartbeatReceived; + } + + private void awaitFirstHeartbeat(final AtomicBoolean heartbeatReceived) throws Exception { + mockTime.sleep(HEARTBEAT_INTERVAL_MS); + TestUtils.waitForCondition(new TestCondition() { + @Override + public boolean conditionMet() { + return heartbeatReceived.get(); + } + }, 3000, "Should have received a heartbeat request after joining the group"); + } + private Struct groupCoordinatorResponse(Node node, Errors error) { GroupCoordinatorResponse response = new GroupCoordinatorResponse(error.code(), node); return response.toStruct(); @@ -173,6 +487,9 @@ public class AbstractCoordinatorTest { public class DummyCoordinator extends AbstractCoordinator { + private int onJoinPrepareInvokes = 0; + private int onJoinCompleteInvokes = 0; + public DummyCoordinator(ConsumerNetworkClient client, Metrics metrics, Time time) { @@ -200,12 +517,12 @@ public class AbstractCoordinatorTest { @Override protected void onJoinPrepare(int generation, String memberId) { - + onJoinPrepareInvokes++; } @Override protected void onJoinComplete(int generation, String memberId, String protocol, ByteBuffer memberAssignment) { - + onJoinCompleteInvokes++; } }