Repository: kafka Updated Branches: refs/heads/trunk fe11488a7 -> ae5a5d7c0
KAFKA-2792: Don't wait for a response to the leave group message when closing the new consumer. Author: Ewen Cheslack-Postava <m...@ewencp.org> Reviewers: Onur Karaman, Gwen Shapira Closes #480 from ewencp/kafka-2792-fix-blocking-consumer-close Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/ae5a5d7c Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/ae5a5d7c Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/ae5a5d7c Branch: refs/heads/trunk Commit: ae5a5d7c08bb634576a414f6f2864c5b8a7e58a3 Parents: fe11488 Author: Ewen Cheslack-Postava <m...@ewencp.org> Authored: Tue Nov 10 10:26:51 2015 -0800 Committer: Gwen Shapira <csh...@gmail.com> Committed: Tue Nov 10 10:26:51 2015 -0800 ---------------------------------------------------------------------- .../apache/kafka/clients/consumer/KafkaConsumer.java | 2 +- .../consumer/internals/AbstractCoordinator.java | 13 +++++-------- .../consumer/internals/ConsumerCoordinatorTest.java | 4 ++-- 3 files changed, 8 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/ae5a5d7c/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java index d3616f9..89b2f0b 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java @@ -760,7 +760,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { try { log.debug("Unsubscribed all topics or patterns and assigned partitions"); this.subscriptions.unsubscribe(); - this.coordinator.maybeLeaveGroup(false); + this.coordinator.maybeLeaveGroup(); this.metadata.needMetadataForAllTopics(false); } finally { release(); http://git-wip-us.apache.org/repos/asf/kafka/blob/ae5a5d7c/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java index 5b5c8a5..a12c6c1 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java @@ -540,18 +540,18 @@ public abstract class AbstractCoordinator implements Closeable { @Override public void close() { client.disableWakeups(); - maybeLeaveGroup(true); + maybeLeaveGroup(); } /** * Leave the current group and reset local generation/memberId. */ - public void maybeLeaveGroup(boolean awaitResponse) { + public void maybeLeaveGroup() { client.unschedule(heartbeatTask); if (!coordinatorUnknown() && generation > 0) { // this is a minimal effort attempt to leave the group. we do not // attempt any resending if the request fails or times out. - sendLeaveGroupRequest(awaitResponse); + sendLeaveGroupRequest(); } this.generation = OffsetCommitRequest.DEFAULT_GENERATION_ID; @@ -559,7 +559,7 @@ public abstract class AbstractCoordinator implements Closeable { rejoinNeeded = true; } - private void sendLeaveGroupRequest(boolean awaitResponse) { + private void sendLeaveGroupRequest() { LeaveGroupRequest request = new LeaveGroupRequest(groupId, memberId); RequestFuture<Void> future = client.send(coordinator, ApiKeys.LEAVE_GROUP, request) .compose(new LeaveGroupResponseHandler()); @@ -574,10 +574,7 @@ public abstract class AbstractCoordinator implements Closeable { } }); - if (awaitResponse) - client.poll(future); - else - client.poll(future, 0); + client.poll(future, 0); } private class LeaveGroupResponseHandler extends CoordinatorResponseHandler<LeaveGroupResponse, Void> { http://git-wip-us.apache.org/repos/asf/kafka/blob/ae5a5d7c/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java index 86ac6b3..500aaed 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java @@ -418,7 +418,7 @@ public class ConsumerCoordinatorTest { leaveRequest.groupId().equals(groupId); } }, new LeaveGroupResponse(Errors.NONE.code()).toStruct()); - coordinator.maybeLeaveGroup(false); + coordinator.maybeLeaveGroup(); assertTrue(received.get()); assertEquals(JoinGroupRequest.UNKNOWN_MEMBER_ID, coordinator.memberId); assertEquals(OffsetCommitRequest.DEFAULT_GENERATION_ID, coordinator.generation); @@ -672,7 +672,7 @@ public class ConsumerCoordinatorTest { // now switch to manual assignment client.prepareResponse(new LeaveGroupResponse(Errors.NONE.code()).toStruct()); subscriptions.unsubscribe(); - coordinator.maybeLeaveGroup(false); + coordinator.maybeLeaveGroup(); subscriptions.assignFromUser(Arrays.asList(tp)); // the client should not reuse generation/memberId from auto-subscribed generation