Repository: kafka Updated Branches: refs/heads/0.10.2 c231f5280 -> ba4eafa78
KAFKA-4786; Wait for heartbeat thread to terminate in consumer close Author: Rajini Sivaram <rajinisiva...@googlemail.com> Reviewers: Apurva Mehta <apu...@confluent.io>, Ismael Juma <ism...@juma.me.uk>, Jason Gustafson <ja...@confluent.io> Closes #2586 from rajinisivaram/KAFKA-4786 (cherry picked from commit 5916ef0227d099e5fa05341db3f918f5ef035816) Signed-off-by: Jason Gustafson <ja...@confluent.io> Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/ba4eafa7 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/ba4eafa7 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/ba4eafa7 Branch: refs/heads/0.10.2 Commit: ba4eafa7874988374abcd9f48fbab96abb2032a4 Parents: c231f52 Author: Rajini Sivaram <rajinisiva...@googlemail.com> Authored: Wed Feb 22 12:08:09 2017 -0800 Committer: Jason Gustafson <ja...@confluent.io> Committed: Wed Feb 22 12:35:13 2017 -0800 ---------------------------------------------------------------------- .../consumer/internals/AbstractCoordinator.java | 46 ++++++++++++++------ 1 file changed, 33 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/ba4eafa7/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java index 6eea045..b72769e 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java @@ -315,6 +315,19 @@ public abstract class AbstractCoordinator implements Closeable { heartbeatThread.disable(); } + private void closeHeartbeatThread() { + if (heartbeatThread != null) { + heartbeatThread.close(); + + try { + heartbeatThread.join(); + } catch (InterruptedException e) { + log.warn("Interrupted while waiting for consumer heartbeat thread to close"); + throw new InterruptException(e); + } + } + } + // visible for testing. Joins the group without starting the heartbeat thread. void joinGroupIfNeeded() { while (needRejoin() || rejoinIncomplete()) { @@ -652,19 +665,26 @@ public abstract class AbstractCoordinator implements Closeable { close(0); } - protected synchronized void close(long timeoutMs) { - if (heartbeatThread != null) - heartbeatThread.close(); - maybeLeaveGroup(); - - // At this point, there may be pending commits (async commits or sync commits that were - // interrupted using wakeup) and the leave group request which have been queued, but not - // yet sent to the broker. Wait up to close timeout for these pending requests to be processed. - // If coordinator is not known, requests are aborted. - Node coordinator = coordinator(); - if (coordinator != null && !client.awaitPendingRequests(coordinator, timeoutMs)) - log.warn("Close timed out with {} pending requests to coordinator, terminating client connections for group {}.", - client.pendingRequestCount(coordinator), groupId); + protected void close(long timeoutMs) { + try { + closeHeartbeatThread(); + } finally { + + // Synchronize after closing the heartbeat thread since heartbeat thread + // needs this lock to complete and terminate after close flag is set. + synchronized (this) { + maybeLeaveGroup(); + + // At this point, there may be pending commits (async commits or sync commits that were + // interrupted using wakeup) and the leave group request which have been queued, but not + // yet sent to the broker. Wait up to close timeout for these pending requests to be processed. + // If coordinator is not known, requests are aborted. + Node coordinator = coordinator(); + if (coordinator != null && !client.awaitPendingRequests(coordinator, timeoutMs)) + log.warn("Close timed out with {} pending requests to coordinator, terminating client connections for group {}.", + client.pendingRequestCount(coordinator), groupId); + } + } } /**