Repository: kafka
Updated Branches:
  refs/heads/trunk fe11488a7 -> ae5a5d7c0


KAFKA-2792: Don't wait for a response to the leave group message when closing 
the new consumer.

Author: Ewen Cheslack-Postava <m...@ewencp.org>

Reviewers: Onur Karaman, Gwen Shapira

Closes #480 from ewencp/kafka-2792-fix-blocking-consumer-close


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/ae5a5d7c
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/ae5a5d7c
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/ae5a5d7c

Branch: refs/heads/trunk
Commit: ae5a5d7c08bb634576a414f6f2864c5b8a7e58a3
Parents: fe11488
Author: Ewen Cheslack-Postava <m...@ewencp.org>
Authored: Tue Nov 10 10:26:51 2015 -0800
Committer: Gwen Shapira <csh...@gmail.com>
Committed: Tue Nov 10 10:26:51 2015 -0800

----------------------------------------------------------------------
 .../apache/kafka/clients/consumer/KafkaConsumer.java   |  2 +-
 .../consumer/internals/AbstractCoordinator.java        | 13 +++++--------
 .../consumer/internals/ConsumerCoordinatorTest.java    |  4 ++--
 3 files changed, 8 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/ae5a5d7c/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index d3616f9..89b2f0b 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -760,7 +760,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
         try {
             log.debug("Unsubscribed all topics or patterns and assigned 
partitions");
             this.subscriptions.unsubscribe();
-            this.coordinator.maybeLeaveGroup(false);
+            this.coordinator.maybeLeaveGroup();
             this.metadata.needMetadataForAllTopics(false);
         } finally {
             release();

http://git-wip-us.apache.org/repos/asf/kafka/blob/ae5a5d7c/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
index 5b5c8a5..a12c6c1 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
@@ -540,18 +540,18 @@ public abstract class AbstractCoordinator implements 
Closeable {
     @Override
     public void close() {
         client.disableWakeups();
-        maybeLeaveGroup(true);
+        maybeLeaveGroup();
     }
 
     /**
      * Leave the current group and reset local generation/memberId.
      */
-    public void maybeLeaveGroup(boolean awaitResponse) {
+    public void maybeLeaveGroup() {
         client.unschedule(heartbeatTask);
         if (!coordinatorUnknown() && generation > 0) {
             // this is a minimal effort attempt to leave the group. we do not
             // attempt any resending if the request fails or times out.
-            sendLeaveGroupRequest(awaitResponse);
+            sendLeaveGroupRequest();
         }
 
         this.generation = OffsetCommitRequest.DEFAULT_GENERATION_ID;
@@ -559,7 +559,7 @@ public abstract class AbstractCoordinator implements 
Closeable {
         rejoinNeeded = true;
     }
 
-    private void sendLeaveGroupRequest(boolean awaitResponse) {
+    private void sendLeaveGroupRequest() {
         LeaveGroupRequest request = new LeaveGroupRequest(groupId, memberId);
         RequestFuture<Void> future = client.send(coordinator, 
ApiKeys.LEAVE_GROUP, request)
                 .compose(new LeaveGroupResponseHandler());
@@ -574,10 +574,7 @@ public abstract class AbstractCoordinator implements 
Closeable {
             }
         });
 
-        if (awaitResponse)
-            client.poll(future);
-        else
-            client.poll(future, 0);
+        client.poll(future, 0);
     }
 
     private class LeaveGroupResponseHandler extends 
CoordinatorResponseHandler<LeaveGroupResponse, Void> {

http://git-wip-us.apache.org/repos/asf/kafka/blob/ae5a5d7c/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index 86ac6b3..500aaed 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -418,7 +418,7 @@ public class ConsumerCoordinatorTest {
                         leaveRequest.groupId().equals(groupId);
             }
         }, new LeaveGroupResponse(Errors.NONE.code()).toStruct());
-        coordinator.maybeLeaveGroup(false);
+        coordinator.maybeLeaveGroup();
         assertTrue(received.get());
         assertEquals(JoinGroupRequest.UNKNOWN_MEMBER_ID, coordinator.memberId);
         assertEquals(OffsetCommitRequest.DEFAULT_GENERATION_ID, 
coordinator.generation);
@@ -672,7 +672,7 @@ public class ConsumerCoordinatorTest {
         // now switch to manual assignment
         client.prepareResponse(new 
LeaveGroupResponse(Errors.NONE.code()).toStruct());
         subscriptions.unsubscribe();
-        coordinator.maybeLeaveGroup(false);
+        coordinator.maybeLeaveGroup();
         subscriptions.assignFromUser(Arrays.asList(tp));
 
         // the client should not reuse generation/memberId from 
auto-subscribed generation

Reply via email to