Repository: kafka Updated Branches: refs/heads/trunk 913c09e4a -> 5916ef022
KAFKA-4786; Wait for heartbeat thread to terminate in consumer close Author: Rajini Sivaram <rajinisiva...@googlemail.com> Reviewers: Apurva Mehta <apu...@confluent.io>, Ismael Juma <ism...@juma.me.uk>, Jason Gustafson <ja...@confluent.io> Closes #2586 from rajinisivaram/KAFKA-4786 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/5916ef02 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/5916ef02 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/5916ef02 Branch: refs/heads/trunk Commit: 5916ef0227d099e5fa05341db3f918f5ef035816 Parents: 913c09e Author: Rajini Sivaram <rajinisiva...@googlemail.com> Authored: Wed Feb 22 12:08:09 2017 -0800 Committer: Jason Gustafson <ja...@confluent.io> Committed: Wed Feb 22 12:08:09 2017 -0800 ---------------------------------------------------------------------- .../consumer/internals/AbstractCoordinator.java | 46 ++++++++++++++------ 1 file changed, 33 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/5916ef02/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java index 1c2d607..d36aac9 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java @@ -315,6 +315,19 @@ public abstract class AbstractCoordinator implements Closeable { heartbeatThread.disable(); } + private void closeHeartbeatThread() { + if (heartbeatThread != null) { + heartbeatThread.close(); + + try { + heartbeatThread.join(); + } catch (InterruptedException e) { + log.warn("Interrupted while waiting for consumer heartbeat thread to close"); + throw new InterruptException(e); + } + } + } + // visible for testing. Joins the group without starting the heartbeat thread. void joinGroupIfNeeded() { while (needRejoin() || rejoinIncomplete()) { @@ -652,19 +665,26 @@ public abstract class AbstractCoordinator implements Closeable { close(0); } - protected synchronized void close(long timeoutMs) { - if (heartbeatThread != null) - heartbeatThread.close(); - maybeLeaveGroup(); - - // At this point, there may be pending commits (async commits or sync commits that were - // interrupted using wakeup) and the leave group request which have been queued, but not - // yet sent to the broker. Wait up to close timeout for these pending requests to be processed. - // If coordinator is not known, requests are aborted. - Node coordinator = coordinator(); - if (coordinator != null && !client.awaitPendingRequests(coordinator, timeoutMs)) - log.warn("Close timed out with {} pending requests to coordinator, terminating client connections for group {}.", - client.pendingRequestCount(coordinator), groupId); + protected void close(long timeoutMs) { + try { + closeHeartbeatThread(); + } finally { + + // Synchronize after closing the heartbeat thread since heartbeat thread + // needs this lock to complete and terminate after close flag is set. + synchronized (this) { + maybeLeaveGroup(); + + // At this point, there may be pending commits (async commits or sync commits that were + // interrupted using wakeup) and the leave group request which have been queued, but not + // yet sent to the broker. Wait up to close timeout for these pending requests to be processed. + // If coordinator is not known, requests are aborted. + Node coordinator = coordinator(); + if (coordinator != null && !client.awaitPendingRequests(coordinator, timeoutMs)) + log.warn("Close timed out with {} pending requests to coordinator, terminating client connections for group {}.", + client.pendingRequestCount(coordinator), groupId); + } + } } /**