Repository: kafka
Updated Branches:
  refs/heads/0.10.1 be20ea528 -> c9f7fa29d


KAFKA-3782: Ensure heartbeat thread restarted after rebalance woken up

Author: Jason Gustafson <ja...@confluent.io>

Reviewers: Guozhang Wang

Closes #1898 from hachikuji/KAFKA-3782

(cherry picked from commit 732fabf94ebc9631d31f2feb2116ee8b63beabef)
Signed-off-by: Guozhang Wang <wangg...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/c9f7fa29
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/c9f7fa29
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/c9f7fa29

Branch: refs/heads/0.10.1
Commit: c9f7fa29d1b0c165f13ff6e18442580b04cd8ec9
Parents: be20ea5
Author: Jason Gustafson <ja...@confluent.io>
Authored: Thu Sep 22 10:07:50 2016 -0700
Committer: Guozhang Wang <wangg...@gmail.com>
Committed: Thu Sep 22 10:07:59 2016 -0700

----------------------------------------------------------------------
 .../consumer/internals/AbstractCoordinator.java |  23 +-
 .../internals/AbstractCoordinatorTest.java      | 323 ++++++++++++++++++-
 2 files changed, 330 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/c9f7fa29/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
index 159ac27..73543ad 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
@@ -232,6 +232,10 @@ public abstract class AbstractCoordinator implements 
Closeable {
         return rejoinNeeded;
     }
 
+    private synchronized boolean rejoinIncomplete() {
+        return joinFuture != null;
+    }
+
     /**
      * Check the status of the heartbeat thread (if it is active) and indicate 
the liveness
      * of the client. This must be called periodically after joining with 
{@link #ensureActiveGroup()}
@@ -287,7 +291,7 @@ public abstract class AbstractCoordinator implements 
Closeable {
 
     // visible for testing. Joins the group without starting the heartbeat 
thread.
     void joinGroupIfNeeded() {
-        while (needRejoin()) {
+        while (needRejoin() || rejoinIncomplete()) {
             ensureCoordinatorReady();
 
             // call onJoinPrepare if needed. We set a flag to make sure that 
we do not call it a second
@@ -300,18 +304,6 @@ public abstract class AbstractCoordinator implements 
Closeable {
                 needsJoinPrepare = false;
             }
 
-            // fence off the heartbeat thread explicitly so that it cannot 
interfere with the join group.
-            // Note that this must come after the call to onJoinPrepare since 
we must be able to continue
-            // sending heartbeats if that callback takes some time.
-            disableHeartbeatThread();
-
-            // ensure that there are no pending requests to the coordinator. 
This is important
-            // in particular to avoid resending a pending JoinGroup request.
-            if (client.pendingRequestCount(this.coordinator) > 0) {
-                client.awaitPendingRequests(this.coordinator);
-                continue;
-            }
-
             RequestFuture<ByteBuffer> future = initiateJoinGroup();
             client.poll(future);
             resetJoinGroupFuture();
@@ -341,6 +333,11 @@ public abstract class AbstractCoordinator implements 
Closeable {
         // rebalance in the call to poll below. This ensures that we do not 
mistakenly attempt
         // to rejoin before the pending rebalance has completed.
         if (joinFuture == null) {
+            // fence off the heartbeat thread explicitly so that it cannot 
interfere with the join group.
+            // Note that this must come after the call to onJoinPrepare since 
we must be able to continue
+            // sending heartbeats if that callback takes some time.
+            disableHeartbeatThread();
+
             state = MemberState.REBALANCING;
             joinFuture = sendJoinGroupRequest();
             joinFuture.addListener(new RequestFutureListener<ByteBuffer>() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/c9f7fa29/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
index 4f8425a..3c8c793 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
@@ -21,6 +21,7 @@ import org.apache.kafka.clients.Metadata;
 import org.apache.kafka.clients.MockClient;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.Node;
+import org.apache.kafka.common.errors.WakeupException;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
@@ -32,6 +33,7 @@ import org.apache.kafka.common.requests.JoinGroupResponse;
 import org.apache.kafka.common.requests.SyncGroupResponse;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.test.TestCondition;
 import org.apache.kafka.test.TestUtils;
 import org.junit.Before;
 import org.junit.Test;
@@ -41,6 +43,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -106,7 +109,6 @@ public class AbstractCoordinatorTest {
         mockClient.prepareResponse(joinGroupFollowerResponse(1, "memberId", 
"leaderId", Errors.NONE));
         mockClient.prepareResponse(syncGroupResponse(Errors.NONE));
 
-
         final RuntimeException e = new RuntimeException();
 
         // raise the error when the background thread tries to send a heartbeat
@@ -152,6 +154,318 @@ public class AbstractCoordinatorTest {
         assertTrue("New request not sent after previous completed", future != 
coordinator.lookupCoordinator());
     }
 
+    @Test
+    public void testWakeupAfterJoinGroupSent() throws Exception {
+        mockClient.prepareResponse(groupCoordinatorResponse(node, 
Errors.NONE));
+        mockClient.prepareResponse(new MockClient.RequestMatcher() {
+            private int invocations = 0;
+            @Override
+            public boolean matches(ClientRequest request) {
+                invocations++;
+                boolean isJoinGroupRequest = 
request.request().header().apiKey() == ApiKeys.JOIN_GROUP.id;
+                if (isJoinGroupRequest && invocations == 1)
+                    // simulate wakeup before the request returns
+                    throw new WakeupException();
+                return isJoinGroupRequest;
+            }
+        }, joinGroupFollowerResponse(1, "memberId", "leaderId", Errors.NONE));
+        mockClient.prepareResponse(syncGroupResponse(Errors.NONE));
+        AtomicBoolean heartbeatReceived = prepareFirstHeartbeat();
+
+        try {
+            coordinator.ensureActiveGroup();
+            fail("Should have woken up from ensureActiveGroup()");
+        } catch (WakeupException e) {
+        }
+
+        assertEquals(1, coordinator.onJoinPrepareInvokes);
+        assertEquals(0, coordinator.onJoinCompleteInvokes);
+        assertFalse(heartbeatReceived.get());
+
+        coordinator.ensureActiveGroup();
+
+        assertEquals(1, coordinator.onJoinPrepareInvokes);
+        assertEquals(1, coordinator.onJoinCompleteInvokes);
+
+        awaitFirstHeartbeat(heartbeatReceived);
+    }
+
+    @Test
+    public void testWakeupAfterJoinGroupSentExternalCompletion() throws 
Exception {
+        mockClient.prepareResponse(groupCoordinatorResponse(node, 
Errors.NONE));
+        mockClient.prepareResponse(new MockClient.RequestMatcher() {
+            private int invocations = 0;
+            @Override
+            public boolean matches(ClientRequest request) {
+                invocations++;
+                boolean isJoinGroupRequest = 
request.request().header().apiKey() == ApiKeys.JOIN_GROUP.id;
+                if (isJoinGroupRequest && invocations == 1)
+                    // simulate wakeup before the request returns
+                    throw new WakeupException();
+                return isJoinGroupRequest;
+            }
+        }, joinGroupFollowerResponse(1, "memberId", "leaderId", Errors.NONE));
+        mockClient.prepareResponse(syncGroupResponse(Errors.NONE));
+        AtomicBoolean heartbeatReceived = prepareFirstHeartbeat();
+
+        try {
+            coordinator.ensureActiveGroup();
+            fail("Should have woken up from ensureActiveGroup()");
+        } catch (WakeupException e) {
+        }
+
+        assertEquals(1, coordinator.onJoinPrepareInvokes);
+        assertEquals(0, coordinator.onJoinCompleteInvokes);
+        assertFalse(heartbeatReceived.get());
+
+        // the join group completes in this poll()
+        consumerClient.poll(0);
+        coordinator.ensureActiveGroup();
+
+        assertEquals(1, coordinator.onJoinPrepareInvokes);
+        assertEquals(1, coordinator.onJoinCompleteInvokes);
+
+        awaitFirstHeartbeat(heartbeatReceived);
+    }
+
+    @Test
+    public void testWakeupAfterJoinGroupReceived() throws Exception {
+        mockClient.prepareResponse(groupCoordinatorResponse(node, 
Errors.NONE));
+        mockClient.prepareResponse(new MockClient.RequestMatcher() {
+            @Override
+            public boolean matches(ClientRequest request) {
+                boolean isJoinGroupRequest = 
request.request().header().apiKey() == ApiKeys.JOIN_GROUP.id;
+                if (isJoinGroupRequest)
+                    // wakeup after the request returns
+                    consumerClient.wakeup();
+                return isJoinGroupRequest;
+            }
+        }, joinGroupFollowerResponse(1, "memberId", "leaderId", Errors.NONE));
+        mockClient.prepareResponse(syncGroupResponse(Errors.NONE));
+        AtomicBoolean heartbeatReceived = prepareFirstHeartbeat();
+
+        try {
+            coordinator.ensureActiveGroup();
+            fail("Should have woken up from ensureActiveGroup()");
+        } catch (WakeupException e) {
+        }
+
+        assertEquals(1, coordinator.onJoinPrepareInvokes);
+        assertEquals(0, coordinator.onJoinCompleteInvokes);
+        assertFalse(heartbeatReceived.get());
+
+        coordinator.ensureActiveGroup();
+
+        assertEquals(1, coordinator.onJoinPrepareInvokes);
+        assertEquals(1, coordinator.onJoinCompleteInvokes);
+
+        awaitFirstHeartbeat(heartbeatReceived);
+    }
+
+    @Test
+    public void testWakeupAfterJoinGroupReceivedExternalCompletion() throws 
Exception {
+        mockClient.prepareResponse(groupCoordinatorResponse(node, 
Errors.NONE));
+        mockClient.prepareResponse(new MockClient.RequestMatcher() {
+            @Override
+            public boolean matches(ClientRequest request) {
+                boolean isJoinGroupRequest = 
request.request().header().apiKey() == ApiKeys.JOIN_GROUP.id;
+                if (isJoinGroupRequest)
+                    // wakeup after the request returns
+                    consumerClient.wakeup();
+                return isJoinGroupRequest;
+            }
+        }, joinGroupFollowerResponse(1, "memberId", "leaderId", Errors.NONE));
+        mockClient.prepareResponse(syncGroupResponse(Errors.NONE));
+        AtomicBoolean heartbeatReceived = prepareFirstHeartbeat();
+
+        try {
+            coordinator.ensureActiveGroup();
+            fail("Should have woken up from ensureActiveGroup()");
+        } catch (WakeupException e) {
+        }
+
+        assertEquals(1, coordinator.onJoinPrepareInvokes);
+        assertEquals(0, coordinator.onJoinCompleteInvokes);
+        assertFalse(heartbeatReceived.get());
+
+        // the join group completes in this poll()
+        consumerClient.poll(0);
+        coordinator.ensureActiveGroup();
+
+        assertEquals(1, coordinator.onJoinPrepareInvokes);
+        assertEquals(1, coordinator.onJoinCompleteInvokes);
+
+        awaitFirstHeartbeat(heartbeatReceived);
+    }
+
+    @Test
+    public void testWakeupAfterSyncGroupSent() throws Exception {
+        mockClient.prepareResponse(groupCoordinatorResponse(node, 
Errors.NONE));
+        mockClient.prepareResponse(joinGroupFollowerResponse(1, "memberId", 
"leaderId", Errors.NONE));
+        mockClient.prepareResponse(new MockClient.RequestMatcher() {
+            private int invocations = 0;
+            @Override
+            public boolean matches(ClientRequest request) {
+                invocations++;
+                boolean isSyncGroupRequest = 
request.request().header().apiKey() == ApiKeys.SYNC_GROUP.id;
+                if (isSyncGroupRequest && invocations == 1)
+                    // simulate wakeup after the request sent
+                    throw new WakeupException();
+                return isSyncGroupRequest;
+            }
+        }, syncGroupResponse(Errors.NONE));
+        AtomicBoolean heartbeatReceived = prepareFirstHeartbeat();
+
+        try {
+            coordinator.ensureActiveGroup();
+            fail("Should have woken up from ensureActiveGroup()");
+        } catch (WakeupException e) {
+        }
+
+        assertEquals(1, coordinator.onJoinPrepareInvokes);
+        assertEquals(0, coordinator.onJoinCompleteInvokes);
+        assertFalse(heartbeatReceived.get());
+
+        coordinator.ensureActiveGroup();
+
+        assertEquals(1, coordinator.onJoinPrepareInvokes);
+        assertEquals(1, coordinator.onJoinCompleteInvokes);
+
+        awaitFirstHeartbeat(heartbeatReceived);
+    }
+
+    @Test
+    public void testWakeupAfterSyncGroupSentExternalCompletion() throws 
Exception {
+        mockClient.prepareResponse(groupCoordinatorResponse(node, 
Errors.NONE));
+        mockClient.prepareResponse(joinGroupFollowerResponse(1, "memberId", 
"leaderId", Errors.NONE));
+        mockClient.prepareResponse(new MockClient.RequestMatcher() {
+            private int invocations = 0;
+            @Override
+            public boolean matches(ClientRequest request) {
+                invocations++;
+                boolean isSyncGroupRequest = 
request.request().header().apiKey() == ApiKeys.SYNC_GROUP.id;
+                if (isSyncGroupRequest && invocations == 1)
+                    // simulate wakeup after the request sent
+                    throw new WakeupException();
+                return isSyncGroupRequest;
+            }
+        }, syncGroupResponse(Errors.NONE));
+        AtomicBoolean heartbeatReceived = prepareFirstHeartbeat();
+
+        try {
+            coordinator.ensureActiveGroup();
+            fail("Should have woken up from ensureActiveGroup()");
+        } catch (WakeupException e) {
+        }
+
+        assertEquals(1, coordinator.onJoinPrepareInvokes);
+        assertEquals(0, coordinator.onJoinCompleteInvokes);
+        assertFalse(heartbeatReceived.get());
+
+        // the join group completes in this poll()
+        consumerClient.poll(0);
+        coordinator.ensureActiveGroup();
+
+        assertEquals(1, coordinator.onJoinPrepareInvokes);
+        assertEquals(1, coordinator.onJoinCompleteInvokes);
+
+        awaitFirstHeartbeat(heartbeatReceived);
+    }
+
+    @Test
+    public void testWakeupAfterSyncGroupReceived() throws Exception {
+        mockClient.prepareResponse(groupCoordinatorResponse(node, 
Errors.NONE));
+        mockClient.prepareResponse(joinGroupFollowerResponse(1, "memberId", 
"leaderId", Errors.NONE));
+        mockClient.prepareResponse(new MockClient.RequestMatcher() {
+            @Override
+            public boolean matches(ClientRequest request) {
+                boolean isSyncGroupRequest = 
request.request().header().apiKey() == ApiKeys.SYNC_GROUP.id;
+                if (isSyncGroupRequest)
+                    // wakeup after the request returns
+                    consumerClient.wakeup();
+                return isSyncGroupRequest;
+            }
+        }, syncGroupResponse(Errors.NONE));
+        AtomicBoolean heartbeatReceived = prepareFirstHeartbeat();
+
+        try {
+            coordinator.ensureActiveGroup();
+            fail("Should have woken up from ensureActiveGroup()");
+        } catch (WakeupException e) {
+        }
+
+        assertEquals(1, coordinator.onJoinPrepareInvokes);
+        assertEquals(0, coordinator.onJoinCompleteInvokes);
+        assertFalse(heartbeatReceived.get());
+
+        coordinator.ensureActiveGroup();
+
+        assertEquals(1, coordinator.onJoinPrepareInvokes);
+        assertEquals(1, coordinator.onJoinCompleteInvokes);
+
+        awaitFirstHeartbeat(heartbeatReceived);
+    }
+
+    @Test
+    public void testWakeupAfterSyncGroupReceivedExternalCompletion() throws 
Exception {
+        mockClient.prepareResponse(groupCoordinatorResponse(node, 
Errors.NONE));
+        mockClient.prepareResponse(joinGroupFollowerResponse(1, "memberId", 
"leaderId", Errors.NONE));
+        mockClient.prepareResponse(new MockClient.RequestMatcher() {
+            @Override
+            public boolean matches(ClientRequest request) {
+                boolean isSyncGroupRequest = 
request.request().header().apiKey() == ApiKeys.SYNC_GROUP.id;
+                if (isSyncGroupRequest)
+                    // wakeup after the request returns
+                    consumerClient.wakeup();
+                return isSyncGroupRequest;
+            }
+        }, syncGroupResponse(Errors.NONE));
+        AtomicBoolean heartbeatReceived = prepareFirstHeartbeat();
+
+        try {
+            coordinator.ensureActiveGroup();
+            fail("Should have woken up from ensureActiveGroup()");
+        } catch (WakeupException e) {
+        }
+
+        assertEquals(1, coordinator.onJoinPrepareInvokes);
+        assertEquals(0, coordinator.onJoinCompleteInvokes);
+        assertFalse(heartbeatReceived.get());
+
+        // the join group completes in this poll()
+        consumerClient.poll(0);
+        coordinator.ensureActiveGroup();
+
+        assertEquals(1, coordinator.onJoinPrepareInvokes);
+        assertEquals(1, coordinator.onJoinCompleteInvokes);
+
+        awaitFirstHeartbeat(heartbeatReceived);
+    }
+
+    private AtomicBoolean prepareFirstHeartbeat() {
+        final AtomicBoolean heartbeatReceived = new AtomicBoolean(false);
+        mockClient.prepareResponse(new MockClient.RequestMatcher() {
+            @Override
+            public boolean matches(ClientRequest request) {
+                boolean isHeartbeatRequest = 
request.request().header().apiKey() == ApiKeys.HEARTBEAT.id;
+                if (isHeartbeatRequest)
+                    heartbeatReceived.set(true);
+                return isHeartbeatRequest;
+            }
+        }, heartbeatResponse(Errors.UNKNOWN));
+        return heartbeatReceived;
+    }
+
+    private void awaitFirstHeartbeat(final AtomicBoolean heartbeatReceived) 
throws Exception {
+        mockTime.sleep(HEARTBEAT_INTERVAL_MS);
+        TestUtils.waitForCondition(new TestCondition() {
+            @Override
+            public boolean conditionMet() {
+                return heartbeatReceived.get();
+            }
+        }, 3000, "Should have received a heartbeat request after joining the 
group");
+    }
+
     private Struct groupCoordinatorResponse(Node node, Errors error) {
         GroupCoordinatorResponse response = new 
GroupCoordinatorResponse(error.code(), node);
         return response.toStruct();
@@ -173,6 +487,9 @@ public class AbstractCoordinatorTest {
 
     public class DummyCoordinator extends AbstractCoordinator {
 
+        private int onJoinPrepareInvokes = 0;
+        private int onJoinCompleteInvokes = 0;
+
         public DummyCoordinator(ConsumerNetworkClient client,
                                 Metrics metrics,
                                 Time time) {
@@ -200,12 +517,12 @@ public class AbstractCoordinatorTest {
 
         @Override
         protected void onJoinPrepare(int generation, String memberId) {
-
+            onJoinPrepareInvokes++;
         }
 
         @Override
         protected void onJoinComplete(int generation, String memberId, String 
protocol, ByteBuffer memberAssignment) {
-
+            onJoinCompleteInvokes++;
         }
     }
 

Reply via email to